]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_data_sync.cc
update sources to 12.2.10
[ceph.git] / ceph / src / rgw / rgw_data_sync.cc
CommitLineData
91327a77
AA
1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2// vim: ts=8 sw=2 smarttab
3
7c673cae
FG
4#include <boost/utility/string_ref.hpp>
5
6#include "common/ceph_json.h"
7#include "common/RWLock.h"
8#include "common/RefCountedObj.h"
9#include "common/WorkQueue.h"
10#include "common/Throttle.h"
11#include "common/errno.h"
12
13#include "rgw_common.h"
14#include "rgw_rados.h"
15#include "rgw_sync.h"
16#include "rgw_data_sync.h"
17#include "rgw_rest_conn.h"
18#include "rgw_cr_rados.h"
19#include "rgw_cr_rest.h"
20#include "rgw_http_client.h"
21#include "rgw_bucket.h"
22#include "rgw_metadata.h"
7c673cae 23#include "rgw_sync_module.h"
b32b8144 24#include "rgw_sync_log_trim.h"
7c673cae
FG
25
26#include "cls/lock/cls_lock_client.h"
27
31f18b77
FG
28#include "auth/Crypto.h"
29
30#include <boost/asio/yield.hpp>
31
7c673cae
FG
32#define dout_subsys ceph_subsys_rgw
33
34#undef dout_prefix
35#define dout_prefix (*_dout << "data sync: ")
36
37static string datalog_sync_status_oid_prefix = "datalog.sync-status";
38static string datalog_sync_status_shard_prefix = "datalog.sync-status.shard";
39static string datalog_sync_full_sync_index_prefix = "data.full-sync.index";
40static string bucket_status_oid_prefix = "bucket.sync-status";
41
42class RGWSyncDebugLogger {
43 CephContext *cct;
44 string prefix;
45
46 bool ended;
47
48public:
49 RGWSyncDebugLogger(CephContext *_cct, const string& source_zone,
50 const string& sync_type, const string& sync_stage,
51 const string& resource, bool log_start = true) {
52 init(_cct, source_zone, sync_type, sync_stage, resource, log_start);
53 }
54 RGWSyncDebugLogger() : cct(NULL), ended(false) {}
55 ~RGWSyncDebugLogger();
56
57 void init(CephContext *_cct, const string& source_zone,
58 const string& sync_type, const string& sync_stage,
59 const string& resource, bool log_start = true);
60 void log(const string& state);
61 void finish(int status);
62};
63
64void RGWSyncDebugLogger::init(CephContext *_cct, const string& source_zone,
65 const string& sync_type, const string& sync_section,
66 const string& resource, bool log_start)
67{
68 cct = _cct;
69 ended = false;
70 string zone_str = source_zone.substr(0, 8);
71 prefix = "Sync:" + zone_str + ":" + sync_type + ":" + sync_section + ":" + resource;
72 if (log_start) {
73 log("start");
74 }
75}
76
77RGWSyncDebugLogger::~RGWSyncDebugLogger()
78{
79 if (!ended) {
80 log("finish");
81 }
82}
83
84void RGWSyncDebugLogger::log(const string& state)
85{
86 ldout(cct, 5) << prefix << ":" << state << dendl;
87}
88
89void RGWSyncDebugLogger::finish(int status)
90{
91 ended = true;
92 ldout(cct, 5) << prefix << ":" << "finish r=" << status << dendl;
93}
94
95class RGWDataSyncDebugLogger : public RGWSyncDebugLogger {
96public:
97 RGWDataSyncDebugLogger() {}
98 RGWDataSyncDebugLogger(RGWDataSyncEnv *sync_env, const string& sync_section,
99 const string& resource, bool log_start = true) {
100 init(sync_env, sync_section, resource, log_start);
101 }
102 void init(RGWDataSyncEnv *sync_env, const string& sync_section,
103 const string& resource, bool log_start = true) {
104 RGWSyncDebugLogger::init(sync_env->cct, sync_env->source_zone, "data", sync_section, resource, log_start);
105 }
106
107};
108
109void rgw_datalog_info::decode_json(JSONObj *obj) {
110 JSONDecoder::decode_json("num_objects", num_shards, obj);
111}
112
113void rgw_datalog_entry::decode_json(JSONObj *obj) {
114 JSONDecoder::decode_json("key", key, obj);
115 utime_t ut;
116 JSONDecoder::decode_json("timestamp", ut, obj);
117 timestamp = ut.to_real_time();
118}
119
120void rgw_datalog_shard_data::decode_json(JSONObj *obj) {
121 JSONDecoder::decode_json("marker", marker, obj);
122 JSONDecoder::decode_json("truncated", truncated, obj);
123 JSONDecoder::decode_json("entries", entries, obj);
124};
125
126class RGWReadDataSyncStatusMarkersCR : public RGWShardCollectCR {
127 static constexpr int MAX_CONCURRENT_SHARDS = 16;
128
129 RGWDataSyncEnv *env;
130 const int num_shards;
131 int shard_id{0};;
132
133 map<uint32_t, rgw_data_sync_marker>& markers;
134
135 public:
136 RGWReadDataSyncStatusMarkersCR(RGWDataSyncEnv *env, int num_shards,
137 map<uint32_t, rgw_data_sync_marker>& markers)
138 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS),
139 env(env), num_shards(num_shards), markers(markers)
140 {}
141 bool spawn_next() override;
142};
143
144bool RGWReadDataSyncStatusMarkersCR::spawn_next()
145{
146 if (shard_id >= num_shards) {
147 return false;
148 }
149 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
150 spawn(new CR(env->async_rados, env->store,
151 rgw_raw_obj(env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id)),
152 &markers[shard_id]),
153 false);
154 shard_id++;
155 return true;
156}
157
28e407b8
AA
158class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR {
159 static constexpr int MAX_CONCURRENT_SHARDS = 16;
160
161 RGWDataSyncEnv *env;
162
163 uint64_t max_entries;
164 int num_shards;
165 int shard_id{0};;
166
167 string marker;
168 map<int, std::set<std::string>> &entries_map;
169
170 public:
171 RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards,
172 map<int, std::set<std::string>>& _entries_map)
173 : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env),
174 max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map)
175 {}
176 bool spawn_next() override;
177};
178
179bool RGWReadDataSyncRecoveringShardsCR::spawn_next()
180{
181 if (shard_id > num_shards)
182 return false;
183
184 string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry";
185 spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid),
186 marker, &entries_map[shard_id], max_entries), false);
187
188 ++shard_id;
189 return true;
190}
191
7c673cae
FG
192class RGWReadDataSyncStatusCoroutine : public RGWCoroutine {
193 RGWDataSyncEnv *sync_env;
194 rgw_data_sync_status *sync_status;
195
196public:
197 RGWReadDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
198 rgw_data_sync_status *_status)
199 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), sync_status(_status)
200 {}
201 int operate() override;
202};
203
204int RGWReadDataSyncStatusCoroutine::operate()
205{
206 reenter(this) {
207 // read sync info
208 using ReadInfoCR = RGWSimpleRadosReadCR<rgw_data_sync_info>;
209 yield {
210 bool empty_on_enoent = false; // fail on ENOENT
211 call(new ReadInfoCR(sync_env->async_rados, sync_env->store,
212 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
213 &sync_status->sync_info, empty_on_enoent));
214 }
215 if (retcode < 0) {
216 ldout(sync_env->cct, 4) << "failed to read sync status info with "
217 << cpp_strerror(retcode) << dendl;
218 return set_cr_error(retcode);
219 }
220 // read shard markers
221 using ReadMarkersCR = RGWReadDataSyncStatusMarkersCR;
222 yield call(new ReadMarkersCR(sync_env, sync_status->sync_info.num_shards,
223 sync_status->sync_markers));
224 if (retcode < 0) {
225 ldout(sync_env->cct, 4) << "failed to read sync status markers with "
226 << cpp_strerror(retcode) << dendl;
227 return set_cr_error(retcode);
228 }
229 return set_cr_done();
230 }
231 return 0;
232}
233
234class RGWReadRemoteDataLogShardInfoCR : public RGWCoroutine {
235 RGWDataSyncEnv *sync_env;
236
237 RGWRESTReadResource *http_op;
238
239 int shard_id;
240 RGWDataChangesLogInfo *shard_info;
241
242public:
243 RGWReadRemoteDataLogShardInfoCR(RGWDataSyncEnv *_sync_env,
244 int _shard_id, RGWDataChangesLogInfo *_shard_info) : RGWCoroutine(_sync_env->cct),
245 sync_env(_sync_env),
246 http_op(NULL),
247 shard_id(_shard_id),
248 shard_info(_shard_info) {
249 }
250
251 ~RGWReadRemoteDataLogShardInfoCR() override {
252 if (http_op) {
253 http_op->put();
254 }
255 }
256
257 int operate() override {
258 reenter(this) {
259 yield {
260 char buf[16];
261 snprintf(buf, sizeof(buf), "%d", shard_id);
262 rgw_http_param_pair pairs[] = { { "type" , "data" },
263 { "id", buf },
264 { "info" , NULL },
265 { NULL, NULL } };
266
267 string p = "/admin/log/";
268
269 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
270
271 http_op->set_user_info((void *)stack);
272
273 int ret = http_op->aio_read();
274 if (ret < 0) {
275 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
276 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
277 return set_cr_error(ret);
278 }
279
280 return io_block(0);
281 }
282 yield {
283 int ret = http_op->wait(shard_info);
284 if (ret < 0) {
285 return set_cr_error(ret);
286 }
287 return set_cr_done();
288 }
289 }
290 return 0;
291 }
292};
293
294struct read_remote_data_log_response {
295 string marker;
296 bool truncated;
297 list<rgw_data_change_log_entry> entries;
298
299 read_remote_data_log_response() : truncated(false) {}
300
301 void decode_json(JSONObj *obj) {
302 JSONDecoder::decode_json("marker", marker, obj);
303 JSONDecoder::decode_json("truncated", truncated, obj);
304 JSONDecoder::decode_json("entries", entries, obj);
305 };
306};
307
308class RGWReadRemoteDataLogShardCR : public RGWCoroutine {
309 RGWDataSyncEnv *sync_env;
310
311 RGWRESTReadResource *http_op;
312
313 int shard_id;
314 string *pmarker;
315 list<rgw_data_change_log_entry> *entries;
316 bool *truncated;
317
318 read_remote_data_log_response response;
319
320public:
321 RGWReadRemoteDataLogShardCR(RGWDataSyncEnv *_sync_env,
322 int _shard_id, string *_pmarker, list<rgw_data_change_log_entry> *_entries, bool *_truncated) : RGWCoroutine(_sync_env->cct),
323 sync_env(_sync_env),
324 http_op(NULL),
325 shard_id(_shard_id),
326 pmarker(_pmarker),
327 entries(_entries),
328 truncated(_truncated) {
329 }
330 ~RGWReadRemoteDataLogShardCR() override {
331 if (http_op) {
332 http_op->put();
333 }
334 }
335
336 int operate() override {
337 reenter(this) {
338 yield {
339 char buf[16];
340 snprintf(buf, sizeof(buf), "%d", shard_id);
341 rgw_http_param_pair pairs[] = { { "type" , "data" },
342 { "id", buf },
343 { "marker", pmarker->c_str() },
344 { "extra-info", "true" },
345 { NULL, NULL } };
346
347 string p = "/admin/log/";
348
349 http_op = new RGWRESTReadResource(sync_env->conn, p, pairs, NULL, sync_env->http_manager);
350
351 http_op->set_user_info((void *)stack);
352
353 int ret = http_op->aio_read();
354 if (ret < 0) {
355 ldout(sync_env->cct, 0) << "ERROR: failed to read from " << p << dendl;
356 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
357 return set_cr_error(ret);
358 }
359
360 return io_block(0);
361 }
362 yield {
363 int ret = http_op->wait(&response);
364 if (ret < 0) {
365 return set_cr_error(ret);
366 }
367 entries->clear();
368 entries->swap(response.entries);
369 *pmarker = response.marker;
370 *truncated = response.truncated;
371 return set_cr_done();
372 }
373 }
374 return 0;
375 }
376};
377
378class RGWReadRemoteDataLogInfoCR : public RGWShardCollectCR {
379 RGWDataSyncEnv *sync_env;
380
381 int num_shards;
382 map<int, RGWDataChangesLogInfo> *datalog_info;
383
384 int shard_id;
385#define READ_DATALOG_MAX_CONCURRENT 10
386
387public:
388 RGWReadRemoteDataLogInfoCR(RGWDataSyncEnv *_sync_env,
389 int _num_shards,
390 map<int, RGWDataChangesLogInfo> *_datalog_info) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
391 sync_env(_sync_env), num_shards(_num_shards),
392 datalog_info(_datalog_info), shard_id(0) {}
393 bool spawn_next() override;
394};
395
396bool RGWReadRemoteDataLogInfoCR::spawn_next() {
397 if (shard_id >= num_shards) {
398 return false;
399 }
400 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, shard_id, &(*datalog_info)[shard_id]), false);
401 shard_id++;
402 return true;
403}
404
405class RGWListRemoteDataLogShardCR : public RGWSimpleCoroutine {
406 RGWDataSyncEnv *sync_env;
407 RGWRESTReadResource *http_op;
408
409 int shard_id;
410 string marker;
411 uint32_t max_entries;
412 rgw_datalog_shard_data *result;
413
414public:
415 RGWListRemoteDataLogShardCR(RGWDataSyncEnv *env, int _shard_id,
416 const string& _marker, uint32_t _max_entries,
417 rgw_datalog_shard_data *_result)
418 : RGWSimpleCoroutine(env->store->ctx()), sync_env(env), http_op(NULL),
419 shard_id(_shard_id), marker(_marker), max_entries(_max_entries), result(_result) {}
420
421 int send_request() override {
422 RGWRESTConn *conn = sync_env->conn;
423 RGWRados *store = sync_env->store;
424
425 char buf[32];
426 snprintf(buf, sizeof(buf), "%d", shard_id);
427
428 char max_entries_buf[32];
429 snprintf(max_entries_buf, sizeof(max_entries_buf), "%d", (int)max_entries);
430
431 const char *marker_key = (marker.empty() ? "" : "marker");
432
433 rgw_http_param_pair pairs[] = { { "type", "data" },
434 { "id", buf },
435 { "max-entries", max_entries_buf },
436 { marker_key, marker.c_str() },
437 { NULL, NULL } };
438
439 string p = "/admin/log/";
440
441 http_op = new RGWRESTReadResource(conn, p, pairs, NULL, sync_env->http_manager);
442 http_op->set_user_info((void *)stack);
443
444 int ret = http_op->aio_read();
445 if (ret < 0) {
446 ldout(store->ctx(), 0) << "ERROR: failed to read from " << p << dendl;
447 log_error() << "failed to send http operation: " << http_op->to_str() << " ret=" << ret << std::endl;
448 http_op->put();
449 return ret;
450 }
451
452 return 0;
453 }
454
455 int request_complete() override {
456 int ret = http_op->wait(result);
457 http_op->put();
458 if (ret < 0 && ret != -ENOENT) {
459 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to list remote datalog shard, ret=" << ret << dendl;
460 return ret;
461 }
462 return 0;
463 }
464};
465
466class RGWListRemoteDataLogCR : public RGWShardCollectCR {
467 RGWDataSyncEnv *sync_env;
468
469 map<int, string> shards;
470 int max_entries_per_shard;
471 map<int, rgw_datalog_shard_data> *result;
472
473 map<int, string>::iterator iter;
474#define READ_DATALOG_MAX_CONCURRENT 10
475
476public:
477 RGWListRemoteDataLogCR(RGWDataSyncEnv *_sync_env,
478 map<int, string>& _shards,
479 int _max_entries_per_shard,
480 map<int, rgw_datalog_shard_data> *_result) : RGWShardCollectCR(_sync_env->cct, READ_DATALOG_MAX_CONCURRENT),
481 sync_env(_sync_env), max_entries_per_shard(_max_entries_per_shard),
482 result(_result) {
483 shards.swap(_shards);
484 iter = shards.begin();
485 }
486 bool spawn_next() override;
487};
488
489bool RGWListRemoteDataLogCR::spawn_next() {
490 if (iter == shards.end()) {
491 return false;
492 }
493
494 spawn(new RGWListRemoteDataLogShardCR(sync_env, iter->first, iter->second, max_entries_per_shard, &(*result)[iter->first]), false);
495 ++iter;
496 return true;
497}
498
499class RGWInitDataSyncStatusCoroutine : public RGWCoroutine {
500 static constexpr uint32_t lock_duration = 30;
501 RGWDataSyncEnv *sync_env;
502 RGWRados *store;
503 const rgw_pool& pool;
504 const uint32_t num_shards;
505
506 string sync_status_oid;
507
508 string lock_name;
509 string cookie;
510 rgw_data_sync_status *status;
511 map<int, RGWDataChangesLogInfo> shards_info;
512public:
513 RGWInitDataSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, uint32_t num_shards,
31f18b77 514 uint64_t instance_id,
7c673cae
FG
515 rgw_data_sync_status *status)
516 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), store(sync_env->store),
517 pool(store->get_zone_params().log_pool),
518 num_shards(num_shards), status(status) {
519 lock_name = "sync_lock";
520
31f18b77
FG
521 status->sync_info.instance_id = instance_id;
522
7c673cae
FG
523#define COOKIE_LEN 16
524 char buf[COOKIE_LEN + 1];
525
526 gen_rand_alphanumeric(cct, buf, sizeof(buf) - 1);
527 cookie = buf;
528
529 sync_status_oid = RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone);
530 }
531
532 int operate() override {
533 int ret;
534 reenter(this) {
535 using LockCR = RGWSimpleRadosLockCR;
536 yield call(new LockCR(sync_env->async_rados, store,
537 rgw_raw_obj{pool, sync_status_oid},
538 lock_name, cookie, lock_duration));
539 if (retcode < 0) {
540 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
541 return set_cr_error(retcode);
542 }
543 using WriteInfoCR = RGWSimpleRadosWriteCR<rgw_data_sync_info>;
544 yield call(new WriteInfoCR(sync_env->async_rados, store,
545 rgw_raw_obj{pool, sync_status_oid},
546 status->sync_info));
547 if (retcode < 0) {
548 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
549 return set_cr_error(retcode);
550 }
551
552 /* take lock again, we just recreated the object */
553 yield call(new LockCR(sync_env->async_rados, store,
554 rgw_raw_obj{pool, sync_status_oid},
555 lock_name, cookie, lock_duration));
556 if (retcode < 0) {
557 ldout(cct, 0) << "ERROR: failed to take a lock on " << sync_status_oid << dendl;
558 return set_cr_error(retcode);
559 }
560
561 /* fetch current position in logs */
562 yield {
563 RGWRESTConn *conn = store->get_zone_conn_by_id(sync_env->source_zone);
564 if (!conn) {
565 ldout(cct, 0) << "ERROR: connection to zone " << sync_env->source_zone << " does not exist!" << dendl;
566 return set_cr_error(-EIO);
567 }
568 for (uint32_t i = 0; i < num_shards; i++) {
569 spawn(new RGWReadRemoteDataLogShardInfoCR(sync_env, i, &shards_info[i]), true);
570 }
571 }
572 while (collect(&ret, NULL)) {
573 if (ret < 0) {
574 ldout(cct, 0) << "ERROR: failed to read remote data log shards" << dendl;
575 return set_state(RGWCoroutine_Error);
576 }
577 yield;
578 }
579 yield {
580 for (uint32_t i = 0; i < num_shards; i++) {
581 RGWDataChangesLogInfo& info = shards_info[i];
582 auto& marker = status->sync_markers[i];
583 marker.next_step_marker = info.marker;
584 marker.timestamp = info.last_update;
585 const auto& oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, i);
586 using WriteMarkerCR = RGWSimpleRadosWriteCR<rgw_data_sync_marker>;
587 spawn(new WriteMarkerCR(sync_env->async_rados, store,
588 rgw_raw_obj{pool, oid}, marker), true);
589 }
590 }
591 while (collect(&ret, NULL)) {
592 if (ret < 0) {
593 ldout(cct, 0) << "ERROR: failed to write data sync status markers" << dendl;
594 return set_state(RGWCoroutine_Error);
595 }
596 yield;
597 }
598
599 status->sync_info.state = rgw_data_sync_info::StateBuildingFullSyncMaps;
600 yield call(new WriteInfoCR(sync_env->async_rados, store,
601 rgw_raw_obj{pool, sync_status_oid},
602 status->sync_info));
603 if (retcode < 0) {
604 ldout(cct, 0) << "ERROR: failed to write sync status info with " << retcode << dendl;
605 return set_cr_error(retcode);
606 }
607 yield call(new RGWSimpleRadosUnlockCR(sync_env->async_rados, store,
608 rgw_raw_obj{pool, sync_status_oid},
609 lock_name, cookie));
610 return set_cr_done();
611 }
612 return 0;
613 }
614};
615
616int RGWRemoteDataLog::read_log_info(rgw_datalog_info *log_info)
617{
618 rgw_http_param_pair pairs[] = { { "type", "data" },
619 { NULL, NULL } };
620
621 int ret = sync_env.conn->get_json_resource("/admin/log", pairs, *log_info);
622 if (ret < 0) {
623 ldout(store->ctx(), 0) << "ERROR: failed to fetch datalog info" << dendl;
624 return ret;
625 }
626
627 ldout(store->ctx(), 20) << "remote datalog, num_shards=" << log_info->num_shards << dendl;
628
629 return 0;
630}
631
632int RGWRemoteDataLog::read_source_log_shards_info(map<int, RGWDataChangesLogInfo> *shards_info)
633{
634 rgw_datalog_info log_info;
635 int ret = read_log_info(&log_info);
636 if (ret < 0) {
637 return ret;
638 }
639
640 return run(new RGWReadRemoteDataLogInfoCR(&sync_env, log_info.num_shards, shards_info));
641}
642
643int RGWRemoteDataLog::read_source_log_shards_next(map<int, string> shard_markers, map<int, rgw_datalog_shard_data> *result)
644{
645 if (store->is_meta_master()) {
646 return 0;
647 }
648
649 return run(new RGWListRemoteDataLogCR(&sync_env, shard_markers, 1, result));
650}
651
652int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module)
653{
b32b8144 654 sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger,
91327a77 655 _source_zone, _sync_module);
7c673cae
FG
656
657 if (initialized) {
658 return 0;
659 }
660
661 int ret = http_manager.set_threaded();
662 if (ret < 0) {
663 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
664 return ret;
665 }
666
667 initialized = true;
668
669 return 0;
670}
671
672void RGWRemoteDataLog::finish()
673{
674 stop();
675}
676
7c673cae
FG
677int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status)
678{
679 // cannot run concurrently with run_sync(), so run in a separate manager
680 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
681 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
682 int ret = http_manager.set_threaded();
683 if (ret < 0) {
684 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
685 return ret;
686 }
687 RGWDataSyncEnv sync_env_local = sync_env;
688 sync_env_local.http_manager = &http_manager;
689 ret = crs.run(new RGWReadDataSyncStatusCoroutine(&sync_env_local, sync_status));
690 http_manager.stop();
691 return ret;
692}
693
28e407b8
AA
694int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set<int>& recovering_shards)
695{
696 // cannot run concurrently with run_sync(), so run in a separate manager
697 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
698 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
699 int ret = http_manager.set_threaded();
700 if (ret < 0) {
701 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
702 return ret;
703 }
704 RGWDataSyncEnv sync_env_local = sync_env;
705 sync_env_local.http_manager = &http_manager;
706 map<int, std::set<std::string>> entries_map;
707 uint64_t max_entries{1};
708 ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map));
709 http_manager.stop();
710
711 if (ret == 0) {
712 for (const auto& entry : entries_map) {
713 if (entry.second.size() != 0) {
714 recovering_shards.insert(entry.first);
715 }
716 }
717 }
718
719 return ret;
720}
721
7c673cae
FG
722int RGWRemoteDataLog::init_sync_status(int num_shards)
723{
724 rgw_data_sync_status sync_status;
b32b8144
FG
725 sync_status.sync_info.num_shards = num_shards;
726
7c673cae
FG
727 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
728 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
729 int ret = http_manager.set_threaded();
730 if (ret < 0) {
731 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
732 return ret;
733 }
734 RGWDataSyncEnv sync_env_local = sync_env;
735 sync_env_local.http_manager = &http_manager;
31f18b77
FG
736 uint64_t instance_id;
737 get_random_bytes((char *)&instance_id, sizeof(instance_id));
738 ret = crs.run(new RGWInitDataSyncStatusCoroutine(&sync_env_local, num_shards, instance_id, &sync_status));
7c673cae
FG
739 http_manager.stop();
740 return ret;
741}
742
743static string full_data_sync_index_shard_oid(const string& source_zone, int shard_id)
744{
745 char buf[datalog_sync_full_sync_index_prefix.size() + 1 + source_zone.size() + 1 + 16];
746 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_full_sync_index_prefix.c_str(), source_zone.c_str(), shard_id);
747 return string(buf);
748}
749
750struct bucket_instance_meta_info {
751 string key;
752 obj_version ver;
753 utime_t mtime;
754 RGWBucketInstanceMetadataObject data;
755
756 bucket_instance_meta_info() {}
757
758 void decode_json(JSONObj *obj) {
759 JSONDecoder::decode_json("key", key, obj);
760 JSONDecoder::decode_json("ver", ver, obj);
761 JSONDecoder::decode_json("mtime", mtime, obj);
762 JSONDecoder::decode_json("data", data, obj);
763 }
764};
765
766class RGWListBucketIndexesCR : public RGWCoroutine {
767 RGWDataSyncEnv *sync_env;
768
769 RGWRados *store;
770
771 rgw_data_sync_status *sync_status;
772 int num_shards;
773
774 int req_ret;
775 int ret;
776
777 list<string> result;
778 list<string>::iterator iter;
779
780 RGWShardedOmapCRManager *entries_index;
781
782 string oid_prefix;
783
784 string path;
785 bucket_instance_meta_info meta_info;
786 string key;
787 string s;
788 int i;
789
790 bool failed;
791
792public:
793 RGWListBucketIndexesCR(RGWDataSyncEnv *_sync_env,
794 rgw_data_sync_status *_sync_status) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
795 store(sync_env->store), sync_status(_sync_status),
796 req_ret(0), ret(0), entries_index(NULL), i(0), failed(false) {
797 oid_prefix = datalog_sync_full_sync_index_prefix + "." + sync_env->source_zone;
798 path = "/admin/metadata/bucket.instance";
799 num_shards = sync_status->sync_info.num_shards;
800 }
801 ~RGWListBucketIndexesCR() override {
802 delete entries_index;
803 }
804
805 int operate() override {
806 reenter(this) {
7c673cae
FG
807 yield {
808 string entrypoint = string("/admin/metadata/bucket.instance");
809 /* FIXME: need a better scaling solution here, requires streaming output */
810 call(new RGWReadRESTResourceCR<list<string> >(store->ctx(), sync_env->conn, sync_env->http_manager,
811 entrypoint, NULL, &result));
812 }
3efd9988 813 if (retcode < 0) {
7c673cae 814 ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl;
3efd9988 815 return set_cr_error(retcode);
7c673cae 816 }
3efd9988
FG
817 entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards,
818 store->get_zone_params().log_pool,
819 oid_prefix);
820 yield; // yield so OmapAppendCRs can start
7c673cae
FG
821 for (iter = result.begin(); iter != result.end(); ++iter) {
822 ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl;
823
824 key = *iter;
825
826 yield {
827 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
828 { NULL, NULL } };
829
830 call(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), sync_env->conn, sync_env->http_manager, path, pairs, &meta_info));
831 }
832
833 num_shards = meta_info.data.get_bucket_info().num_shards;
834 if (num_shards > 0) {
835 for (i = 0; i < num_shards; i++) {
836 char buf[16];
837 snprintf(buf, sizeof(buf), ":%d", i);
838 s = key + buf;
839 yield entries_index->append(s, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, i));
840 }
841 } else {
842 yield entries_index->append(key, store->data_log->get_log_shard_id(meta_info.data.get_bucket_info().bucket, -1));
843 }
844 }
845 yield {
846 if (!entries_index->finish()) {
847 failed = true;
848 }
849 }
850 if (!failed) {
851 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status->sync_markers.begin(); iter != sync_status->sync_markers.end(); ++iter) {
852 int shard_id = (int)iter->first;
853 rgw_data_sync_marker& marker = iter->second;
854 marker.total_entries = entries_index->get_total_entries(shard_id);
855 spawn(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
856 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
857 marker), true);
858 }
859 } else {
860 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
861 EIO, string("failed to build bucket instances map")));
862 }
863 while (collect(&ret, NULL)) {
864 if (ret < 0) {
865 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data.init", "",
866 -ret, string("failed to store sync status: ") + cpp_strerror(-ret)));
867 req_ret = ret;
868 }
869 yield;
870 }
871 drain_all();
872 if (req_ret < 0) {
873 yield return set_cr_error(req_ret);
874 }
875 yield return set_cr_done();
876 }
877 return 0;
878 }
879};
880
881#define DATA_SYNC_UPDATE_MARKER_WINDOW 1
882
883class RGWDataSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, string> {
884 RGWDataSyncEnv *sync_env;
885
886 string marker_oid;
887 rgw_data_sync_marker sync_marker;
888
889 map<string, string> key_to_marker;
890 map<string, string> marker_to_key;
891
892 void handle_finish(const string& marker) override {
893 map<string, string>::iterator iter = marker_to_key.find(marker);
894 if (iter == marker_to_key.end()) {
895 return;
896 }
897 key_to_marker.erase(iter->second);
898 reset_need_retry(iter->second);
899 marker_to_key.erase(iter);
900 }
901
902public:
903 RGWDataSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
904 const string& _marker_oid,
905 const rgw_data_sync_marker& _marker) : RGWSyncShardMarkerTrack(DATA_SYNC_UPDATE_MARKER_WINDOW),
906 sync_env(_sync_env),
907 marker_oid(_marker_oid),
908 sync_marker(_marker) {}
909
910 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
911 sync_marker.marker = new_marker;
912 sync_marker.pos = index_pos;
913
914 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
915 RGWRados *store = sync_env->store;
916
917 return new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
918 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
919 sync_marker);
920 }
921
922 /*
923 * create index from key -> marker, and from marker -> key
924 * this is useful so that we can insure that we only have one
925 * entry for any key that is used. This is needed when doing
926 * incremenatl sync of data, and we don't want to run multiple
927 * concurrent sync operations for the same bucket shard
928 */
929 bool index_key_to_marker(const string& key, const string& marker) {
930 if (key_to_marker.find(key) != key_to_marker.end()) {
931 set_need_retry(key);
932 return false;
933 }
934 key_to_marker[key] = marker;
935 marker_to_key[marker] = key;
936 return true;
937 }
91327a77
AA
938
939 RGWOrderCallCR *allocate_order_control_cr() {
940 return new RGWLastCallerWinsCR(sync_env->cct);
941 }
7c673cae
FG
942};
943
944// ostream wrappers to print buckets without copying strings
945struct bucket_str {
946 const rgw_bucket& b;
947 bucket_str(const rgw_bucket& b) : b(b) {}
948};
949std::ostream& operator<<(std::ostream& out, const bucket_str& rhs) {
950 auto& b = rhs.b;
951 if (!b.tenant.empty()) {
952 out << b.tenant << '/';
953 }
954 out << b.name;
955 if (!b.bucket_id.empty()) {
956 out << ':' << b.bucket_id;
957 }
958 return out;
959}
960
961struct bucket_shard_str {
962 const rgw_bucket_shard& bs;
963 bucket_shard_str(const rgw_bucket_shard& bs) : bs(bs) {}
964};
965std::ostream& operator<<(std::ostream& out, const bucket_shard_str& rhs) {
966 auto& bs = rhs.bs;
967 out << bucket_str{bs.bucket};
968 if (bs.shard_id >= 0) {
969 out << ':' << bs.shard_id;
970 }
971 return out;
972}
973
974class RGWRunBucketSyncCoroutine : public RGWCoroutine {
975 RGWDataSyncEnv *sync_env;
976 rgw_bucket_shard bs;
977 RGWBucketInfo bucket_info;
978 rgw_bucket_shard_sync_info sync_status;
979 RGWMetaSyncEnv meta_sync_env;
980
981 RGWDataSyncDebugLogger logger;
982 const std::string status_oid;
983
984 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
985 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
986
987public:
988 RGWRunBucketSyncCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs)
989 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
990 status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)) {
991 logger.init(sync_env, "Bucket", bs.get_key());
992 }
993 ~RGWRunBucketSyncCoroutine() override {
994 if (lease_cr) {
995 lease_cr->abort();
996 }
997 }
998
999 int operate() override;
1000};
1001
1002class RGWDataSyncSingleEntryCR : public RGWCoroutine {
1003 RGWDataSyncEnv *sync_env;
1004
1005 string raw_key;
1006 string entry_marker;
1007
1008 rgw_bucket_shard bs;
1009
1010 int sync_status;
1011
1012 bufferlist md_bl;
1013
1014 RGWDataSyncShardMarkerTrack *marker_tracker;
1015
1016 boost::intrusive_ptr<RGWOmapAppend> error_repo;
1017 bool remove_from_repo;
1018
1019 set<string> keys;
1020
1021public:
1022 RGWDataSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
1023 const string& _raw_key, const string& _entry_marker, RGWDataSyncShardMarkerTrack *_marker_tracker,
1024 RGWOmapAppend *_error_repo, bool _remove_from_repo) : RGWCoroutine(_sync_env->cct),
1025 sync_env(_sync_env),
1026 raw_key(_raw_key), entry_marker(_entry_marker),
1027 sync_status(0),
1028 marker_tracker(_marker_tracker),
1029 error_repo(_error_repo), remove_from_repo(_remove_from_repo) {
1030 set_description() << "data sync single entry (source_zone=" << sync_env->source_zone << ") key=" <<_raw_key << " entry=" << entry_marker;
1031 }
1032
1033 int operate() override {
1034 reenter(this) {
1035 do {
1036 yield {
1037 int ret = rgw_bucket_parse_bucket_key(sync_env->cct, raw_key,
1038 &bs.bucket, &bs.shard_id);
1039 if (ret < 0) {
1040 return set_cr_error(-EIO);
1041 }
1042 if (marker_tracker) {
1043 marker_tracker->reset_need_retry(raw_key);
1044 }
1045 call(new RGWRunBucketSyncCoroutine(sync_env, bs));
1046 }
1047 } while (marker_tracker && marker_tracker->need_retry(raw_key));
1048
1049 sync_status = retcode;
1050
1051 if (sync_status == -ENOENT) {
1052 // this was added when 'tenant/' was added to datalog entries, because
1053 // preexisting tenant buckets could never sync and would stay in the
1054 // error_repo forever
1055 ldout(sync_env->store->ctx(), 0) << "WARNING: skipping data log entry "
1056 "for missing bucket " << raw_key << dendl;
1057 sync_status = 0;
1058 }
1059
1060 if (sync_status < 0) {
b32b8144
FG
1061 // write actual sync failures for 'radosgw-admin sync error list'
1062 if (sync_status != -EBUSY && sync_status != -EAGAIN) {
1063 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key,
1064 -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status)));
1065 if (retcode < 0) {
1066 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl;
1067 }
7c673cae
FG
1068 }
1069 if (error_repo && !error_repo->append(raw_key)) {
1070 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl;
1071 }
1072 } else if (error_repo && remove_from_repo) {
1073 keys = {raw_key};
1074 yield call(new RGWRadosRemoveOmapKeysCR(sync_env->store, error_repo->get_obj(), keys));
1075 if (retcode < 0) {
1076 ldout(sync_env->store->ctx(), 0) << "ERROR: failed to remove omap key from error repo ("
1077 << error_repo->get_obj() << " retcode=" << retcode << dendl;
1078 }
1079 }
1080 /* FIXME: what do do in case of error */
1081 if (marker_tracker && !entry_marker.empty()) {
1082 /* update marker */
1083 yield call(marker_tracker->finish(entry_marker));
1084 }
1085 if (sync_status == 0) {
1086 sync_status = retcode;
1087 }
1088 if (sync_status < 0) {
1089 return set_cr_error(sync_status);
1090 }
1091 return set_cr_done();
1092 }
1093 return 0;
1094 }
1095};
1096
1097#define BUCKET_SHARD_SYNC_SPAWN_WINDOW 20
1098#define DATA_SYNC_MAX_ERR_ENTRIES 10
1099
1100class RGWDataSyncShardCR : public RGWCoroutine {
1101 RGWDataSyncEnv *sync_env;
1102
1103 rgw_pool pool;
1104
1105 uint32_t shard_id;
1106 rgw_data_sync_marker sync_marker;
1107
28e407b8
AA
1108 std::set<std::string> entries;
1109 std::set<std::string>::iterator iter;
7c673cae
FG
1110
1111 string oid;
1112
1113 RGWDataSyncShardMarkerTrack *marker_tracker;
1114
1115 list<rgw_data_change_log_entry> log_entries;
1116 list<rgw_data_change_log_entry>::iterator log_iter;
1117 bool truncated;
1118
7c673cae
FG
1119 Mutex inc_lock;
1120 Cond inc_cond;
1121
1122 boost::asio::coroutine incremental_cr;
1123 boost::asio::coroutine full_cr;
1124
1125
1126 set<string> modified_shards;
1127 set<string> current_modified;
1128
1129 set<string>::iterator modified_iter;
1130
1131 int total_entries;
1132
1133 int spawn_window;
1134
1135 bool *reset_backoff;
1136
1137 set<string> spawned_keys;
1138
31f18b77
FG
1139 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
1140 boost::intrusive_ptr<RGWCoroutinesStack> lease_stack;
7c673cae
FG
1141 string status_oid;
1142
1143
1144 string error_oid;
1145 RGWOmapAppend *error_repo;
28e407b8 1146 std::set<std::string> error_entries;
7c673cae
FG
1147 string error_marker;
1148 int max_error_entries;
1149
91327a77 1150 ceph::coarse_real_time error_retry_time;
7c673cae
FG
1151
1152#define RETRY_BACKOFF_SECS_MIN 60
1153#define RETRY_BACKOFF_SECS_DEFAULT 60
1154#define RETRY_BACKOFF_SECS_MAX 600
1155 uint32_t retry_backoff_secs;
1156
1157 RGWDataSyncDebugLogger logger;
1158public:
1159 RGWDataSyncShardCR(RGWDataSyncEnv *_sync_env,
1160 rgw_pool& _pool,
1161 uint32_t _shard_id, rgw_data_sync_marker& _marker, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1162 sync_env(_sync_env),
1163 pool(_pool),
1164 shard_id(_shard_id),
1165 sync_marker(_marker),
91327a77 1166 marker_tracker(NULL), truncated(false), inc_lock("RGWDataSyncShardCR::inc_lock"),
7c673cae
FG
1167 total_entries(0), spawn_window(BUCKET_SHARD_SYNC_SPAWN_WINDOW), reset_backoff(NULL),
1168 lease_cr(nullptr), lease_stack(nullptr), error_repo(nullptr), max_error_entries(DATA_SYNC_MAX_ERR_ENTRIES),
1169 retry_backoff_secs(RETRY_BACKOFF_SECS_DEFAULT) {
1170 set_description() << "data sync shard source_zone=" << sync_env->source_zone << " shard_id=" << shard_id;
1171 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
1172 error_oid = status_oid + ".retry";
1173
1174 logger.init(sync_env, "DataShard", status_oid);
1175 }
1176
1177 ~RGWDataSyncShardCR() override {
1178 delete marker_tracker;
1179 if (lease_cr) {
1180 lease_cr->abort();
7c673cae
FG
1181 }
1182 if (error_repo) {
1183 error_repo->put();
1184 }
1185 }
1186
1187 void append_modified_shards(set<string>& keys) {
1188 Mutex::Locker l(inc_lock);
1189 modified_shards.insert(keys.begin(), keys.end());
1190 }
1191
1192 void set_marker_tracker(RGWDataSyncShardMarkerTrack *mt) {
1193 delete marker_tracker;
1194 marker_tracker = mt;
1195 }
1196
1197 int operate() override {
1198 int r;
1199 while (true) {
1200 switch (sync_marker.state) {
1201 case rgw_data_sync_marker::FullSync:
1202 r = full_sync();
1203 if (r < 0) {
1204 ldout(cct, 10) << "sync: full_sync: shard_id=" << shard_id << " r=" << r << dendl;
1205 return set_cr_error(r);
1206 }
1207 return 0;
1208 case rgw_data_sync_marker::IncrementalSync:
1209 r = incremental_sync();
1210 if (r < 0) {
1211 ldout(cct, 10) << "sync: incremental_sync: shard_id=" << shard_id << " r=" << r << dendl;
1212 return set_cr_error(r);
1213 }
1214 return 0;
1215 default:
1216 return set_cr_error(-EIO);
1217 }
1218 }
1219 return 0;
1220 }
1221
1222 void init_lease_cr() {
1223 set_status("acquiring sync lock");
1224 uint32_t lock_duration = cct->_conf->rgw_sync_lease_period;
1225 string lock_name = "sync_lock";
1226 if (lease_cr) {
1227 lease_cr->abort();
7c673cae
FG
1228 }
1229 RGWRados *store = sync_env->store;
31f18b77
FG
1230 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
1231 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1232 lock_name, lock_duration, this));
1233 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
1234 }
1235
1236 int full_sync() {
1237#define OMAP_GET_MAX_ENTRIES 100
1238 int max_entries = OMAP_GET_MAX_ENTRIES;
1239 reenter(&full_cr) {
1240 yield init_lease_cr();
1241 while (!lease_cr->is_locked()) {
1242 if (lease_cr->is_done()) {
1243 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1244 set_status("lease lock failed, early abort");
1245 return set_cr_error(lease_cr->get_ret_status());
1246 }
1247 set_sleeping(true);
1248 yield;
1249 }
1250 logger.log("full sync");
1251 oid = full_data_sync_index_shard_oid(sync_env->source_zone, shard_id);
1252 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1253 total_entries = sync_marker.pos;
1254 do {
91327a77
AA
1255 if (!lease_cr->is_locked()) {
1256 stop_spawned_services();
1257 drain_all();
1258 return set_cr_error(-ECANCELED);
1259 }
7c673cae
FG
1260 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, oid), sync_marker.marker, &entries, max_entries));
1261 if (retcode < 0) {
1262 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): RGWRadosGetOmapKeysCR() returned ret=" << retcode << dendl;
1263 lease_cr->go_down();
1264 drain_all();
1265 return set_cr_error(retcode);
1266 }
1267 iter = entries.begin();
1268 for (; iter != entries.end(); ++iter) {
28e407b8 1269 ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl;
7c673cae 1270 total_entries++;
28e407b8
AA
1271 if (!marker_tracker->start(*iter, total_entries, real_time())) {
1272 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl;
7c673cae
FG
1273 } else {
1274 // fetch remote and write locally
28e407b8 1275 yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false);
7c673cae 1276 }
28e407b8 1277 sync_marker.marker = *iter;
91327a77
AA
1278
1279 while ((int)num_spawned() > spawn_window) {
1280 set_status() << "num_spawned() > spawn_window";
1281 yield wait_for_child();
1282 int ret;
1283 while (collect(&ret, lease_stack.get())) {
1284 if (ret < 0) {
1285 ldout(cct, 10) << "a sync operation returned error" << dendl;
1286 }
1287 }
1288 }
7c673cae
FG
1289 }
1290 } while ((int)entries.size() == max_entries);
1291
91327a77 1292 drain_all_but_stack(lease_stack.get());
7c673cae
FG
1293
1294 yield {
1295 /* update marker to reflect we're done with full sync */
1296 sync_marker.state = rgw_data_sync_marker::IncrementalSync;
1297 sync_marker.marker = sync_marker.next_step_marker;
1298 sync_marker.next_step_marker.clear();
1299 RGWRados *store = sync_env->store;
1300 call(new RGWSimpleRadosWriteCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1301 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
1302 sync_marker));
1303 }
1304 if (retcode < 0) {
1305 ldout(sync_env->cct, 0) << "ERROR: failed to set sync marker: retcode=" << retcode << dendl;
1306 lease_cr->go_down();
91327a77 1307 drain_all();
7c673cae
FG
1308 return set_cr_error(retcode);
1309 }
91327a77 1310 // keep lease and transition to incremental_sync()
7c673cae
FG
1311 }
1312 return 0;
1313 }
1314
1315 int incremental_sync() {
1316 reenter(&incremental_cr) {
91327a77
AA
1317 ldout(cct, 10) << "start incremental sync" << dendl;
1318 if (lease_cr) {
1319 ldout(cct, 10) << "lease already held from full sync" << dendl;
1320 } else {
1321 yield init_lease_cr();
1322 while (!lease_cr->is_locked()) {
1323 if (lease_cr->is_done()) {
1324 ldout(cct, 5) << "lease cr failed, done early " << dendl;
1325 set_status("lease lock failed, early abort");
1326 return set_cr_error(lease_cr->get_ret_status());
1327 }
1328 set_sleeping(true);
1329 yield;
7c673cae 1330 }
91327a77
AA
1331 set_status("lease acquired");
1332 ldout(cct, 10) << "took lease" << dendl;
7c673cae 1333 }
7c673cae
FG
1334 error_repo = new RGWOmapAppend(sync_env->async_rados, sync_env->store,
1335 rgw_raw_obj(pool, error_oid),
1336 1 /* no buffer */);
1337 error_repo->get();
1338 spawn(error_repo, false);
1339 logger.log("inc sync");
1340 set_marker_tracker(new RGWDataSyncShardMarkerTrack(sync_env, status_oid, sync_marker));
1341 do {
91327a77
AA
1342 if (!lease_cr->is_locked()) {
1343 stop_spawned_services();
1344 drain_all();
1345 return set_cr_error(-ECANCELED);
1346 }
7c673cae
FG
1347 current_modified.clear();
1348 inc_lock.Lock();
1349 current_modified.swap(modified_shards);
1350 inc_lock.Unlock();
1351
1352 /* process out of band updates */
1353 for (modified_iter = current_modified.begin(); modified_iter != current_modified.end(); ++modified_iter) {
1354 yield {
1355 ldout(sync_env->cct, 20) << __func__ << "(): async update notification: " << *modified_iter << dendl;
1356 spawn(new RGWDataSyncSingleEntryCR(sync_env, *modified_iter, string(), marker_tracker, error_repo, false), false);
1357 }
1358 }
1359
91327a77
AA
1360 if (error_retry_time <= ceph::coarse_real_clock::now()) {
1361 /* process bucket shards that previously failed */
1362 yield call(new RGWRadosGetOmapKeysCR(sync_env->store, rgw_raw_obj(pool, error_oid),
1363 error_marker, &error_entries,
1364 max_error_entries));
1365 ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl;
1366 iter = error_entries.begin();
1367 for (; iter != error_entries.end(); ++iter) {
1368 error_marker = *iter;
1369 ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl;
1370 spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false);
1371 }
1372 if ((int)error_entries.size() != max_error_entries) {
1373 if (error_marker.empty() && error_entries.empty()) {
1374 /* the retry repo is empty, we back off a bit before calling it again */
1375 retry_backoff_secs *= 2;
1376 if (retry_backoff_secs > RETRY_BACKOFF_SECS_MAX) {
1377 retry_backoff_secs = RETRY_BACKOFF_SECS_MAX;
1378 }
1379 } else {
1380 retry_backoff_secs = RETRY_BACKOFF_SECS_DEFAULT;
7c673cae 1381 }
91327a77
AA
1382 error_retry_time = ceph::coarse_real_clock::now() + make_timespan(retry_backoff_secs);
1383 error_marker.clear();
7c673cae 1384 }
7c673cae
FG
1385 }
1386
91327a77
AA
1387#define INCREMENTAL_MAX_ENTRIES 100
1388 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << dendl;
1389 spawned_keys.clear();
1390 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &sync_marker.marker, &log_entries, &truncated));
7c673cae 1391 if (retcode < 0) {
91327a77 1392 ldout(sync_env->cct, 0) << "ERROR: failed to read remote data log info: ret=" << retcode << dendl;
7c673cae
FG
1393 stop_spawned_services();
1394 drain_all();
1395 return set_cr_error(retcode);
1396 }
91327a77
AA
1397 for (log_iter = log_entries.begin(); log_iter != log_entries.end(); ++log_iter) {
1398 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " log_entry: " << log_iter->log_id << ":" << log_iter->log_timestamp << ":" << log_iter->entry.key << dendl;
1399 if (!marker_tracker->index_key_to_marker(log_iter->entry.key, log_iter->log_id)) {
1400 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << log_iter->log_id << ":" << log_iter->entry.key << " sync already in progress for bucket shard" << dendl;
1401 marker_tracker->try_update_high_marker(log_iter->log_id, 0, log_iter->log_timestamp);
1402 continue;
7c673cae 1403 }
91327a77
AA
1404 if (!marker_tracker->start(log_iter->log_id, 0, log_iter->log_timestamp)) {
1405 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << log_iter->log_id << ". Duplicate entry?" << dendl;
1406 } else {
1407 /*
1408 * don't spawn the same key more than once. We can do that as long as we don't yield
1409 */
1410 if (spawned_keys.find(log_iter->entry.key) == spawned_keys.end()) {
1411 spawned_keys.insert(log_iter->entry.key);
1412 spawn(new RGWDataSyncSingleEntryCR(sync_env, log_iter->entry.key, log_iter->log_id, marker_tracker, error_repo, false), false);
1413 if (retcode < 0) {
1414 stop_spawned_services();
1415 drain_all();
1416 return set_cr_error(retcode);
7c673cae
FG
1417 }
1418 }
91327a77 1419 }
7c673cae
FG
1420 while ((int)num_spawned() > spawn_window) {
1421 set_status() << "num_spawned() > spawn_window";
1422 yield wait_for_child();
1423 int ret;
31f18b77 1424 while (collect(&ret, lease_stack.get())) {
7c673cae 1425 if (ret < 0) {
91327a77 1426 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
7c673cae
FG
1427 /* we have reported this error */
1428 }
1429 /* not waiting for child here */
1430 }
91327a77 1431 /* not waiting for child here */
7c673cae 1432 }
91327a77
AA
1433 }
1434 ldout(sync_env->cct, 20) << __func__ << ":" << __LINE__ << ": shard_id=" << shard_id << " sync_marker=" << sync_marker.marker << " truncated=" << truncated << dendl;
1435 if (!truncated) {
1436 yield wait(get_idle_interval());
1437 }
7c673cae
FG
1438 } while (true);
1439 }
1440 return 0;
1441 }
91327a77
AA
1442
1443 utime_t get_idle_interval() const {
1444#define INCREMENTAL_INTERVAL 20
1445 ceph::timespan interval = std::chrono::seconds(INCREMENTAL_INTERVAL);
1446 if (!ceph::coarse_real_clock::is_zero(error_retry_time)) {
1447 auto now = ceph::coarse_real_clock::now();
1448 if (error_retry_time > now) {
1449 auto d = error_retry_time - now;
1450 if (interval > d) {
1451 interval = d;
1452 }
1453 }
1454 }
1455 // convert timespan -> time_point -> utime_t
1456 return utime_t(ceph::coarse_real_clock::zero() + interval);
1457 }
1458
7c673cae
FG
1459 void stop_spawned_services() {
1460 lease_cr->go_down();
1461 if (error_repo) {
1462 error_repo->finish();
1463 error_repo->put();
1464 error_repo = NULL;
1465 }
1466 }
1467};
1468
1469class RGWDataSyncShardControlCR : public RGWBackoffControlCR {
1470 RGWDataSyncEnv *sync_env;
1471
1472 rgw_pool pool;
1473
1474 uint32_t shard_id;
1475 rgw_data_sync_marker sync_marker;
1476
1477public:
1478 RGWDataSyncShardControlCR(RGWDataSyncEnv *_sync_env, rgw_pool& _pool,
1479 uint32_t _shard_id, rgw_data_sync_marker& _marker) : RGWBackoffControlCR(_sync_env->cct, false),
1480 sync_env(_sync_env),
1481 pool(_pool),
1482 shard_id(_shard_id),
1483 sync_marker(_marker) {
1484 }
1485
1486 RGWCoroutine *alloc_cr() override {
1487 return new RGWDataSyncShardCR(sync_env, pool, shard_id, sync_marker, backoff_ptr());
1488 }
1489
1490 RGWCoroutine *alloc_finisher_cr() override {
1491 RGWRados *store = sync_env->store;
1492 return new RGWSimpleRadosReadCR<rgw_data_sync_marker>(sync_env->async_rados, store,
1493 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id)),
1494 &sync_marker);
1495 }
1496
1497 void append_modified_shards(set<string>& keys) {
1498 Mutex::Locker l(cr_lock());
1499
1500 RGWDataSyncShardCR *cr = static_cast<RGWDataSyncShardCR *>(get_cr());
1501 if (!cr) {
1502 return;
1503 }
1504
1505 cr->append_modified_shards(keys);
1506 }
1507};
1508
1509class RGWDataSyncCR : public RGWCoroutine {
1510 RGWDataSyncEnv *sync_env;
1511 uint32_t num_shards;
1512
1513 rgw_data_sync_status sync_status;
1514
1515 RGWDataSyncShardMarkerTrack *marker_tracker;
1516
1517 Mutex shard_crs_lock;
1518 map<int, RGWDataSyncShardControlCR *> shard_crs;
1519
1520 bool *reset_backoff;
1521
1522 RGWDataSyncDebugLogger logger;
31f18b77
FG
1523
1524 RGWDataSyncModule *data_sync_module{nullptr};
7c673cae
FG
1525public:
1526 RGWDataSyncCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards, bool *_reset_backoff) : RGWCoroutine(_sync_env->cct),
1527 sync_env(_sync_env),
1528 num_shards(_num_shards),
1529 marker_tracker(NULL),
1530 shard_crs_lock("RGWDataSyncCR::shard_crs_lock"),
1531 reset_backoff(_reset_backoff), logger(sync_env, "Data", "all") {
31f18b77 1532
7c673cae
FG
1533 }
1534
1535 ~RGWDataSyncCR() override {
1536 for (auto iter : shard_crs) {
1537 iter.second->put();
1538 }
1539 }
1540
1541 int operate() override {
1542 reenter(this) {
1543
1544 /* read sync status */
1545 yield call(new RGWReadDataSyncStatusCoroutine(sync_env, &sync_status));
1546
31f18b77
FG
1547 data_sync_module = sync_env->sync_module->get_data_handler();
1548
224ce89b 1549 if (retcode < 0 && retcode != -ENOENT) {
7c673cae
FG
1550 ldout(sync_env->cct, 0) << "ERROR: failed to fetch sync status, retcode=" << retcode << dendl;
1551 return set_cr_error(retcode);
1552 }
1553
1554 /* state: init status */
1555 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateInit) {
1556 ldout(sync_env->cct, 20) << __func__ << "(): init" << dendl;
224ce89b 1557 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1558 uint64_t instance_id;
1559 get_random_bytes((char *)&instance_id, sizeof(instance_id));
1560 yield call(new RGWInitDataSyncStatusCoroutine(sync_env, num_shards, instance_id, &sync_status));
7c673cae
FG
1561 if (retcode < 0) {
1562 ldout(sync_env->cct, 0) << "ERROR: failed to init sync, retcode=" << retcode << dendl;
1563 return set_cr_error(retcode);
1564 }
1565 // sets state = StateBuildingFullSyncMaps
1566
1567 *reset_backoff = true;
1568 }
1569
31f18b77
FG
1570 data_sync_module->init(sync_env, sync_status.sync_info.instance_id);
1571
7c673cae 1572 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) {
31f18b77 1573 /* call sync module init here */
b32b8144 1574 sync_status.sync_info.num_shards = num_shards;
31f18b77
FG
1575 yield call(data_sync_module->init_sync(sync_env));
1576 if (retcode < 0) {
1577 ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl;
1578 return set_cr_error(retcode);
1579 }
7c673cae
FG
1580 /* state: building full sync maps */
1581 ldout(sync_env->cct, 20) << __func__ << "(): building full sync maps" << dendl;
1582 yield call(new RGWListBucketIndexesCR(sync_env, &sync_status));
1583 if (retcode < 0) {
1584 ldout(sync_env->cct, 0) << "ERROR: failed to build full sync maps, retcode=" << retcode << dendl;
1585 return set_cr_error(retcode);
1586 }
1587 sync_status.sync_info.state = rgw_data_sync_info::StateSync;
1588
1589 /* update new state */
1590 yield call(set_sync_info_cr());
1591 if (retcode < 0) {
1592 ldout(sync_env->cct, 0) << "ERROR: failed to write sync status, retcode=" << retcode << dendl;
1593 return set_cr_error(retcode);
1594 }
1595
1596 *reset_backoff = true;
1597 }
1598
1599 yield {
1600 if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateSync) {
1601 for (map<uint32_t, rgw_data_sync_marker>::iterator iter = sync_status.sync_markers.begin();
1602 iter != sync_status.sync_markers.end(); ++iter) {
1603 RGWDataSyncShardControlCR *cr = new RGWDataSyncShardControlCR(sync_env, sync_env->store->get_zone_params().log_pool,
1604 iter->first, iter->second);
1605 cr->get();
1606 shard_crs_lock.Lock();
1607 shard_crs[iter->first] = cr;
1608 shard_crs_lock.Unlock();
1609 spawn(cr, true);
1610 }
1611 }
1612 }
1613
1614 return set_cr_done();
1615 }
1616 return 0;
1617 }
1618
1619 RGWCoroutine *set_sync_info_cr() {
1620 RGWRados *store = sync_env->store;
1621 return new RGWSimpleRadosWriteCR<rgw_data_sync_info>(sync_env->async_rados, store,
1622 rgw_raw_obj(store->get_zone_params().log_pool, RGWDataSyncStatusManager::sync_status_oid(sync_env->source_zone)),
1623 sync_status.sync_info);
1624 }
1625
1626 void wakeup(int shard_id, set<string>& keys) {
1627 Mutex::Locker l(shard_crs_lock);
1628 map<int, RGWDataSyncShardControlCR *>::iterator iter = shard_crs.find(shard_id);
1629 if (iter == shard_crs.end()) {
1630 return;
1631 }
1632 iter->second->append_modified_shards(keys);
1633 iter->second->wakeup();
1634 }
1635};
1636
1637class RGWDefaultDataSyncModule : public RGWDataSyncModule {
1638public:
1639 RGWDefaultDataSyncModule() {}
1640
91327a77 1641 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override;
31f18b77 1642 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae 1643 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1644 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override;
7c673cae
FG
1645};
1646
1647class RGWDefaultSyncModuleInstance : public RGWSyncModuleInstance {
1648 RGWDefaultDataSyncModule data_handler;
1649public:
1650 RGWDefaultSyncModuleInstance() {}
1651 RGWDataSyncModule *get_data_handler() override {
1652 return &data_handler;
1653 }
1654};
1655
31f18b77 1656int RGWDefaultSyncModule::create_instance(CephContext *cct, map<string, string, ltstr_nocase>& config, RGWSyncModuleInstanceRef *instance)
7c673cae
FG
1657{
1658 instance->reset(new RGWDefaultSyncModuleInstance());
1659 return 0;
1660}
1661
91327a77 1662RGWCoroutine *RGWDefaultDataSyncModule::sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, boost::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1663{
1664 return new RGWFetchRemoteObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone, bucket_info,
1665 key, versioned_epoch,
31f18b77 1666 true, zones_trace);
7c673cae
FG
1667}
1668
1669RGWCoroutine *RGWDefaultDataSyncModule::remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key,
31f18b77 1670 real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1671{
1672 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1673 bucket_info, key, versioned, versioned_epoch,
31f18b77 1674 NULL, NULL, false, &mtime, zones_trace);
7c673cae
FG
1675}
1676
1677RGWCoroutine *RGWDefaultDataSyncModule::create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
31f18b77 1678 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace)
7c673cae
FG
1679{
1680 return new RGWRemoveObjCR(sync_env->async_rados, sync_env->store, sync_env->source_zone,
1681 bucket_info, key, versioned, versioned_epoch,
31f18b77 1682 &owner.id, &owner.display_name, true, &mtime, zones_trace);
7c673cae
FG
1683}
1684
1685class RGWDataSyncControlCR : public RGWBackoffControlCR
1686{
1687 RGWDataSyncEnv *sync_env;
1688 uint32_t num_shards;
1689
3efd9988 1690 static constexpr bool exit_on_error = false; // retry on all errors
7c673cae 1691public:
3efd9988 1692 RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error),
7c673cae
FG
1693 sync_env(_sync_env), num_shards(_num_shards) {
1694 }
1695
1696 RGWCoroutine *alloc_cr() override {
1697 return new RGWDataSyncCR(sync_env, num_shards, backoff_ptr());
1698 }
1699
1700 void wakeup(int shard_id, set<string>& keys) {
1701 Mutex& m = cr_lock();
1702
1703 m.Lock();
1704 RGWDataSyncCR *cr = static_cast<RGWDataSyncCR *>(get_cr());
1705 if (!cr) {
1706 m.Unlock();
1707 return;
1708 }
1709
1710 cr->get();
1711 m.Unlock();
1712
1713 if (cr) {
1714 cr->wakeup(shard_id, keys);
1715 }
1716
1717 cr->put();
1718 }
1719};
1720
1721void RGWRemoteDataLog::wakeup(int shard_id, set<string>& keys) {
1722 RWLock::RLocker rl(lock);
1723 if (!data_sync_cr) {
1724 return;
1725 }
1726 data_sync_cr->wakeup(shard_id, keys);
1727}
1728
1729int RGWRemoteDataLog::run_sync(int num_shards)
1730{
1731 lock.get_write();
1732 data_sync_cr = new RGWDataSyncControlCR(&sync_env, num_shards);
1733 data_sync_cr->get(); // run() will drop a ref, so take another
1734 lock.unlock();
1735
1736 int r = run(data_sync_cr);
1737
1738 lock.get_write();
1739 data_sync_cr->put();
1740 data_sync_cr = NULL;
1741 lock.unlock();
1742
1743 if (r < 0) {
1744 ldout(store->ctx(), 0) << "ERROR: failed to run sync" << dendl;
1745 return r;
1746 }
1747 return 0;
1748}
1749
1750int RGWDataSyncStatusManager::init()
1751{
1752 auto zone_def_iter = store->zone_by_id.find(source_zone);
1753 if (zone_def_iter == store->zone_by_id.end()) {
1754 ldout(store->ctx(), 0) << "ERROR: failed to find zone config info for zone=" << source_zone << dendl;
1755 return -EIO;
1756 }
1757
1758 auto& zone_def = zone_def_iter->second;
1759
1760 if (!store->get_sync_modules_manager()->supports_data_export(zone_def.tier_type)) {
1761 return -ENOTSUP;
1762 }
1763
1764 RGWZoneParams& zone_params = store->get_zone_params();
1765
94b18763
FG
1766 if (sync_module == nullptr) {
1767 sync_module = store->get_sync_module();
1768 }
7c673cae
FG
1769
1770 conn = store->get_zone_conn_by_id(source_zone);
1771 if (!conn) {
1772 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
1773 return -EINVAL;
1774 }
1775
1776 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
1777
1778 int r = source_log.init(source_zone, conn, error_logger, sync_module);
1779 if (r < 0) {
1780 lderr(store->ctx()) << "ERROR: failed to init remote log, r=" << r << dendl;
1781 finalize();
1782 return r;
1783 }
1784
1785 rgw_datalog_info datalog_info;
1786 r = source_log.read_log_info(&datalog_info);
1787 if (r < 0) {
1788 ldout(store->ctx(), 5) << "ERROR: master.read_log_info() returned r=" << r << dendl;
1789 finalize();
1790 return r;
1791 }
1792
1793 num_shards = datalog_info.num_shards;
1794
1795 for (int i = 0; i < num_shards; i++) {
1796 shard_objs[i] = rgw_raw_obj(zone_params.log_pool, shard_obj_name(source_zone, i));
1797 }
1798
1799 return 0;
1800}
1801
1802void RGWDataSyncStatusManager::finalize()
1803{
1804 delete error_logger;
1805 error_logger = nullptr;
1806}
1807
1808string RGWDataSyncStatusManager::sync_status_oid(const string& source_zone)
1809{
1810 char buf[datalog_sync_status_oid_prefix.size() + source_zone.size() + 16];
1811 snprintf(buf, sizeof(buf), "%s.%s", datalog_sync_status_oid_prefix.c_str(), source_zone.c_str());
1812
1813 return string(buf);
1814}
1815
1816string RGWDataSyncStatusManager::shard_obj_name(const string& source_zone, int shard_id)
1817{
1818 char buf[datalog_sync_status_shard_prefix.size() + source_zone.size() + 16];
1819 snprintf(buf, sizeof(buf), "%s.%s.%d", datalog_sync_status_shard_prefix.c_str(), source_zone.c_str(), shard_id);
1820
1821 return string(buf);
1822}
1823
1824int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn,
1825 const rgw_bucket& bucket, int shard_id,
1826 RGWSyncErrorLogger *_error_logger,
1827 RGWSyncModuleInstanceRef& _sync_module)
1828{
1829 conn = _conn;
1830 source_zone = _source_zone;
1831 bs.bucket = bucket;
1832 bs.shard_id = shard_id;
1833
b32b8144 1834 sync_env.init(store->ctx(), store, conn, async_rados, http_manager,
91327a77 1835 _error_logger, source_zone, _sync_module);
7c673cae
FG
1836
1837 return 0;
1838}
1839
7c673cae
FG
1840class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine {
1841 RGWDataSyncEnv *sync_env;
1842 const string instance_key;
1843
28e407b8 1844 rgw_bucket_index_marker_info *info;
7c673cae
FG
1845
1846public:
1847 RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env,
1848 const rgw_bucket_shard& bs,
28e407b8 1849 rgw_bucket_index_marker_info *_info)
7c673cae
FG
1850 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1851 instance_key(bs.get_key()), info(_info) {}
1852
1853 int operate() override {
1854 reenter(this) {
1855 yield {
1856 rgw_http_param_pair pairs[] = { { "type" , "bucket-index" },
1857 { "bucket-instance", instance_key.c_str() },
1858 { "info" , NULL },
1859 { NULL, NULL } };
1860
1861 string p = "/admin/log/";
28e407b8 1862 call(new RGWReadRESTResourceCR<rgw_bucket_index_marker_info>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info));
7c673cae
FG
1863 }
1864 if (retcode < 0) {
1865 return set_cr_error(retcode);
1866 }
1867 return set_cr_done();
1868 }
1869 return 0;
1870 }
1871};
1872
1873class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine {
1874 RGWDataSyncEnv *sync_env;
1875
1876 rgw_bucket_shard bs;
1877 const string sync_status_oid;
1878
1879 rgw_bucket_shard_sync_info& status;
1880
28e407b8 1881 rgw_bucket_index_marker_info info;
7c673cae
FG
1882public:
1883 RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1884 const rgw_bucket_shard& bs,
1885 rgw_bucket_shard_sync_info& _status)
1886 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
1887 sync_status_oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1888 status(_status)
1889 {}
1890
1891 int operate() override {
1892 reenter(this) {
1893 /* fetch current position in logs */
1894 yield call(new RGWReadRemoteBucketIndexLogInfoCR(sync_env, bs, &info));
1895 if (retcode < 0 && retcode != -ENOENT) {
1896 ldout(cct, 0) << "ERROR: failed to fetch bucket index status" << dendl;
1897 return set_cr_error(retcode);
1898 }
1899 yield {
7c673cae 1900 auto store = sync_env->store;
c07f9fc5
FG
1901 rgw_raw_obj obj(store->get_zone_params().log_pool, sync_status_oid);
1902
1903 if (info.syncstopped) {
1904 call(new RGWRadosRemoveCR(store, obj));
1905 } else {
1906 status.state = rgw_bucket_shard_sync_info::StateFullSync;
1907 status.inc_marker.position = info.max_marker;
1908 map<string, bufferlist> attrs;
1909 status.encode_all_attrs(attrs);
1910 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store, obj, attrs));
1911 }
7c673cae 1912 }
91327a77
AA
1913 if (info.syncstopped) {
1914 retcode = -ENOENT;
1915 }
1916 if (retcode < 0) {
1917 return set_cr_error(retcode);
1918 }
7c673cae
FG
1919 return set_cr_done();
1920 }
1921 return 0;
1922 }
1923};
1924
1925RGWCoroutine *RGWRemoteBucketLog::init_sync_status_cr()
1926{
1927 return new RGWInitBucketShardSyncStatusCoroutine(&sync_env, bs, init_status);
1928}
1929
1930template <class T>
1931static void decode_attr(CephContext *cct, map<string, bufferlist>& attrs, const string& attr_name, T *val)
1932{
1933 map<string, bufferlist>::iterator iter = attrs.find(attr_name);
1934 if (iter == attrs.end()) {
1935 *val = T();
1936 return;
1937 }
1938
1939 bufferlist::iterator biter = iter->second.begin();
1940 try {
1941 ::decode(*val, biter);
1942 } catch (buffer::error& err) {
1943 ldout(cct, 0) << "ERROR: failed to decode attribute: " << attr_name << dendl;
1944 }
1945}
1946
1947void rgw_bucket_shard_sync_info::decode_from_attrs(CephContext *cct, map<string, bufferlist>& attrs)
1948{
1949 decode_attr(cct, attrs, "state", &state);
1950 decode_attr(cct, attrs, "full_marker", &full_marker);
1951 decode_attr(cct, attrs, "inc_marker", &inc_marker);
1952}
1953
1954void rgw_bucket_shard_sync_info::encode_all_attrs(map<string, bufferlist>& attrs)
1955{
1956 encode_state_attr(attrs);
1957 full_marker.encode_attr(attrs);
1958 inc_marker.encode_attr(attrs);
1959}
1960
1961void rgw_bucket_shard_sync_info::encode_state_attr(map<string, bufferlist>& attrs)
1962{
1963 ::encode(state, attrs["state"]);
1964}
1965
1966void rgw_bucket_shard_full_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1967{
1968 ::encode(*this, attrs["full_marker"]);
1969}
1970
1971void rgw_bucket_shard_inc_sync_marker::encode_attr(map<string, bufferlist>& attrs)
1972{
1973 ::encode(*this, attrs["inc_marker"]);
1974}
1975
1976class RGWReadBucketSyncStatusCoroutine : public RGWCoroutine {
1977 RGWDataSyncEnv *sync_env;
1978 string oid;
1979 rgw_bucket_shard_sync_info *status;
1980
1981 map<string, bufferlist> attrs;
1982public:
1983 RGWReadBucketSyncStatusCoroutine(RGWDataSyncEnv *_sync_env,
1984 const rgw_bucket_shard& bs,
1985 rgw_bucket_shard_sync_info *_status)
1986 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
1987 oid(RGWBucketSyncStatusManager::status_oid(sync_env->source_zone, bs)),
1988 status(_status) {}
1989 int operate() override;
1990};
1991
1992int RGWReadBucketSyncStatusCoroutine::operate()
1993{
1994 reenter(this) {
1995 yield call(new RGWSimpleRadosReadAttrsCR(sync_env->async_rados, sync_env->store,
1996 rgw_raw_obj(sync_env->store->get_zone_params().log_pool, oid),
1997 &attrs));
1998 if (retcode == -ENOENT) {
1999 *status = rgw_bucket_shard_sync_info();
2000 return set_cr_done();
2001 }
2002 if (retcode < 0) {
2003 ldout(sync_env->cct, 0) << "ERROR: failed to call fetch bucket shard info oid=" << oid << " ret=" << retcode << dendl;
2004 return set_cr_error(retcode);
2005 }
2006 status->decode_from_attrs(sync_env->cct, attrs);
2007 return set_cr_done();
2008 }
2009 return 0;
2010}
28e407b8
AA
2011
2012#define OMAP_READ_MAX_ENTRIES 10
2013class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine {
2014 RGWDataSyncEnv *sync_env;
2015 RGWRados *store;
2016
2017 const int shard_id;
2018 int max_entries;
2019
2020 set<string>& recovering_buckets;
2021 string marker;
2022 string error_oid;
2023
2024 set<string> error_entries;
2025 int max_omap_entries;
2026 int count;
2027
2028public:
2029 RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2030 set<string>& _recovering_buckets, const int _max_entries)
2031 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2032 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2033 recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES)
2034 {
2035 error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry";
2036 }
2037
2038 int operate() override;
2039};
2040
2041int RGWReadRecoveringBucketShardsCoroutine::operate()
2042{
2043 reenter(this){
2044 //read recovering bucket shards
2045 count = 0;
2046 do {
2047 yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid),
2048 marker, &error_entries, max_omap_entries));
2049
2050 if (retcode == -ENOENT) {
2051 break;
2052 }
2053
2054 if (retcode < 0) {
2055 ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with "
2056 << cpp_strerror(retcode) << dendl;
2057 return set_cr_error(retcode);
2058 }
2059
2060 if (error_entries.empty()) {
2061 break;
2062 }
2063
2064 count += error_entries.size();
2065 marker = *error_entries.rbegin();
2066 recovering_buckets.insert(error_entries.begin(), error_entries.end());
2067 }while((int)error_entries.size() == max_omap_entries && count < max_entries);
2068
2069 return set_cr_done();
2070 }
2071
2072 return 0;
2073}
2074
2075class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine {
2076 RGWDataSyncEnv *sync_env;
2077 RGWRados *store;
2078
2079 const int shard_id;
2080 int max_entries;
2081
2082 set<string>& pending_buckets;
2083 string marker;
2084 string status_oid;
2085
2086 rgw_data_sync_marker* sync_marker;
2087 int count;
2088
2089 list<rgw_data_change_log_entry> log_entries;
2090 bool truncated;
2091
2092public:
2093 RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id,
2094 set<string>& _pending_buckets,
2095 rgw_data_sync_marker* _sync_marker, const int _max_entries)
2096 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2097 store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries),
2098 pending_buckets(_pending_buckets), sync_marker(_sync_marker)
2099 {
2100 status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id);
2101 }
2102
2103 int operate() override;
2104};
2105
2106int RGWReadPendingBucketShardsCoroutine::operate()
2107{
2108 reenter(this){
2109 //read sync status marker
2110 using CR = RGWSimpleRadosReadCR<rgw_data_sync_marker>;
2111 yield call(new CR(sync_env->async_rados, store,
2112 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2113 sync_marker));
2114 if (retcode < 0) {
2115 ldout(sync_env->cct,0) << "failed to read sync status marker with "
2116 << cpp_strerror(retcode) << dendl;
2117 return set_cr_error(retcode);
2118 }
2119
2120 //read pending bucket shards
2121 marker = sync_marker->marker;
2122 count = 0;
2123 do{
2124 yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated));
2125
2126 if (retcode == -ENOENT) {
2127 break;
2128 }
2129
2130 if (retcode < 0) {
2131 ldout(sync_env->cct,0) << "failed to read remote data log info with "
2132 << cpp_strerror(retcode) << dendl;
2133 return set_cr_error(retcode);
2134 }
2135
2136 if (log_entries.empty()) {
2137 break;
2138 }
2139
2140 count += log_entries.size();
2141 for (const auto& entry : log_entries) {
2142 pending_buckets.insert(entry.entry.key);
2143 }
2144 }while(truncated && count < max_entries);
2145
2146 return set_cr_done();
2147 }
2148
2149 return 0;
2150}
2151
2152int RGWRemoteDataLog::read_shard_status(int shard_id, set<string>& pending_buckets, set<string>& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries)
2153{
2154 // cannot run concurrently with run_sync(), so run in a separate manager
2155 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
2156 RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr());
2157 int ret = http_manager.set_threaded();
2158 if (ret < 0) {
2159 ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl;
2160 return ret;
2161 }
2162 RGWDataSyncEnv sync_env_local = sync_env;
2163 sync_env_local.http_manager = &http_manager;
2164 list<RGWCoroutinesStack *> stacks;
2165 RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2166 recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries));
2167 stacks.push_back(recovering_stack);
2168 RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs);
2169 pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries));
2170 stacks.push_back(pending_stack);
2171 ret = crs.run(stacks);
2172 http_manager.stop();
2173 return ret;
2174}
2175
7c673cae
FG
2176RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status)
2177{
2178 return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status);
2179}
2180
2181RGWBucketSyncStatusManager::~RGWBucketSyncStatusManager() {
2182 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
2183 delete iter->second;
2184 }
2185 delete error_logger;
2186}
2187
2188
2189void rgw_bucket_entry_owner::decode_json(JSONObj *obj)
2190{
2191 JSONDecoder::decode_json("ID", id, obj);
2192 JSONDecoder::decode_json("DisplayName", display_name, obj);
2193}
2194
2195struct bucket_list_entry {
2196 bool delete_marker;
2197 rgw_obj_key key;
2198 bool is_latest;
2199 real_time mtime;
2200 string etag;
2201 uint64_t size;
2202 string storage_class;
2203 rgw_bucket_entry_owner owner;
2204 uint64_t versioned_epoch;
2205 string rgw_tag;
2206
2207 bucket_list_entry() : delete_marker(false), is_latest(false), size(0), versioned_epoch(0) {}
2208
2209 void decode_json(JSONObj *obj) {
2210 JSONDecoder::decode_json("IsDeleteMarker", delete_marker, obj);
2211 JSONDecoder::decode_json("Key", key.name, obj);
2212 JSONDecoder::decode_json("VersionId", key.instance, obj);
2213 JSONDecoder::decode_json("IsLatest", is_latest, obj);
2214 string mtime_str;
2215 JSONDecoder::decode_json("RgwxMtime", mtime_str, obj);
2216
2217 struct tm t;
2218 uint32_t nsec;
2219 if (parse_iso8601(mtime_str.c_str(), &t, &nsec)) {
2220 ceph_timespec ts;
2221 ts.tv_sec = (uint64_t)internal_timegm(&t);
2222 ts.tv_nsec = nsec;
2223 mtime = real_clock::from_ceph_timespec(ts);
2224 }
2225 JSONDecoder::decode_json("ETag", etag, obj);
2226 JSONDecoder::decode_json("Size", size, obj);
2227 JSONDecoder::decode_json("StorageClass", storage_class, obj);
2228 JSONDecoder::decode_json("Owner", owner, obj);
2229 JSONDecoder::decode_json("VersionedEpoch", versioned_epoch, obj);
2230 JSONDecoder::decode_json("RgwxTag", rgw_tag, obj);
2231 }
2232};
2233
2234struct bucket_list_result {
2235 string name;
2236 string prefix;
2237 string key_marker;
2238 string version_id_marker;
2239 int max_keys;
2240 bool is_truncated;
2241 list<bucket_list_entry> entries;
2242
2243 bucket_list_result() : max_keys(0), is_truncated(false) {}
2244
2245 void decode_json(JSONObj *obj) {
2246 JSONDecoder::decode_json("Name", name, obj);
2247 JSONDecoder::decode_json("Prefix", prefix, obj);
2248 JSONDecoder::decode_json("KeyMarker", key_marker, obj);
2249 JSONDecoder::decode_json("VersionIdMarker", version_id_marker, obj);
2250 JSONDecoder::decode_json("MaxKeys", max_keys, obj);
2251 JSONDecoder::decode_json("IsTruncated", is_truncated, obj);
2252 JSONDecoder::decode_json("Entries", entries, obj);
2253 }
2254};
2255
2256class RGWListBucketShardCR: public RGWCoroutine {
2257 RGWDataSyncEnv *sync_env;
2258 const rgw_bucket_shard& bs;
2259 const string instance_key;
2260 rgw_obj_key marker_position;
2261
2262 bucket_list_result *result;
2263
2264public:
2265 RGWListBucketShardCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2266 rgw_obj_key& _marker_position, bucket_list_result *_result)
2267 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
2268 instance_key(bs.get_key()), marker_position(_marker_position),
2269 result(_result) {}
2270
2271 int operate() override {
2272 reenter(this) {
2273 yield {
2274 rgw_http_param_pair pairs[] = { { "rgwx-bucket-instance", instance_key.c_str() },
2275 { "versions" , NULL },
2276 { "format" , "json" },
2277 { "objs-container" , "true" },
2278 { "key-marker" , marker_position.name.c_str() },
2279 { "version-id-marker" , marker_position.instance.c_str() },
2280 { NULL, NULL } };
2281 // don't include tenant in the url, it's already part of instance_key
2282 string p = string("/") + bs.bucket.name;
2283 call(new RGWReadRESTResourceCR<bucket_list_result>(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, result));
2284 }
2285 if (retcode < 0) {
2286 return set_cr_error(retcode);
2287 }
2288 return set_cr_done();
2289 }
2290 return 0;
2291 }
2292};
2293
2294class RGWListBucketIndexLogCR: public RGWCoroutine {
2295 RGWDataSyncEnv *sync_env;
2296 const string instance_key;
2297 string marker;
2298
2299 list<rgw_bi_log_entry> *result;
2300
2301public:
2302 RGWListBucketIndexLogCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2303 string& _marker, list<rgw_bi_log_entry> *_result)
2304 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
2305 instance_key(bs.get_key()), marker(_marker), result(_result) {}
2306
2307 int operate() override {
2308 reenter(this) {
2309 yield {
2310 rgw_http_param_pair pairs[] = { { "bucket-instance", instance_key.c_str() },
2311 { "format" , "json" },
2312 { "marker" , marker.c_str() },
2313 { "type", "bucket-index" },
2314 { NULL, NULL } };
2315
2316 call(new RGWReadRESTResourceCR<list<rgw_bi_log_entry> >(sync_env->cct, sync_env->conn, sync_env->http_manager, "/admin/log", pairs, result));
2317 }
2318 if (retcode < 0) {
2319 return set_cr_error(retcode);
2320 }
2321 return set_cr_done();
2322 }
2323 return 0;
2324 }
2325};
2326
2327#define BUCKET_SYNC_UPDATE_MARKER_WINDOW 10
2328
2329class RGWBucketFullSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<rgw_obj_key, rgw_obj_key> {
2330 RGWDataSyncEnv *sync_env;
2331
2332 string marker_oid;
2333 rgw_bucket_shard_full_sync_marker sync_marker;
2334
2335public:
2336 RGWBucketFullSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2337 const string& _marker_oid,
2338 const rgw_bucket_shard_full_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2339 sync_env(_sync_env),
2340 marker_oid(_marker_oid),
2341 sync_marker(_marker) {}
2342
2343 RGWCoroutine *store_marker(const rgw_obj_key& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2344 sync_marker.position = new_marker;
2345 sync_marker.count = index_pos;
2346
2347 map<string, bufferlist> attrs;
2348 sync_marker.encode_attr(attrs);
2349
2350 RGWRados *store = sync_env->store;
2351
2352 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2353 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2354 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2355 attrs);
2356 }
91327a77
AA
2357
2358 RGWOrderCallCR *allocate_order_control_cr() {
2359 return new RGWLastCallerWinsCR(sync_env->cct);
2360 }
7c673cae
FG
2361};
2362
2363class RGWBucketIncSyncShardMarkerTrack : public RGWSyncShardMarkerTrack<string, rgw_obj_key> {
2364 RGWDataSyncEnv *sync_env;
2365
2366 string marker_oid;
2367 rgw_bucket_shard_inc_sync_marker sync_marker;
2368
2369 map<rgw_obj_key, string> key_to_marker;
91327a77
AA
2370
2371 struct operation {
2372 rgw_obj_key key;
2373 bool is_olh;
2374 };
2375 map<string, operation> marker_to_op;
2376 std::set<std::string> pending_olh; // object names with pending olh operations
7c673cae
FG
2377
2378 void handle_finish(const string& marker) override {
91327a77
AA
2379 auto iter = marker_to_op.find(marker);
2380 if (iter == marker_to_op.end()) {
7c673cae
FG
2381 return;
2382 }
91327a77
AA
2383 auto& op = iter->second;
2384 key_to_marker.erase(op.key);
2385 reset_need_retry(op.key);
2386 if (op.is_olh) {
2387 pending_olh.erase(op.key.name);
2388 }
2389 marker_to_op.erase(iter);
7c673cae
FG
2390 }
2391
2392public:
2393 RGWBucketIncSyncShardMarkerTrack(RGWDataSyncEnv *_sync_env,
2394 const string& _marker_oid,
2395 const rgw_bucket_shard_inc_sync_marker& _marker) : RGWSyncShardMarkerTrack(BUCKET_SYNC_UPDATE_MARKER_WINDOW),
2396 sync_env(_sync_env),
2397 marker_oid(_marker_oid),
2398 sync_marker(_marker) {}
2399
2400 RGWCoroutine *store_marker(const string& new_marker, uint64_t index_pos, const real_time& timestamp) override {
2401 sync_marker.position = new_marker;
2402
2403 map<string, bufferlist> attrs;
2404 sync_marker.encode_attr(attrs);
2405
2406 RGWRados *store = sync_env->store;
2407
2408 ldout(sync_env->cct, 20) << __func__ << "(): updating marker marker_oid=" << marker_oid << " marker=" << new_marker << dendl;
2409 return new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados,
2410 store,
2411 rgw_raw_obj(store->get_zone_params().log_pool, marker_oid),
2412 attrs);
2413 }
2414
2415 /*
2416 * create index from key -> <op, marker>, and from marker -> key
2417 * this is useful so that we can insure that we only have one
2418 * entry for any key that is used. This is needed when doing
2419 * incremenatl sync of data, and we don't want to run multiple
2420 * concurrent sync operations for the same bucket shard
2421 * Also, we should make sure that we don't run concurrent operations on the same key with
2422 * different ops.
2423 */
91327a77
AA
2424 bool index_key_to_marker(const rgw_obj_key& key, const string& marker, bool is_olh) {
2425 auto result = key_to_marker.emplace(key, marker);
2426 if (!result.second) { // exists
7c673cae
FG
2427 set_need_retry(key);
2428 return false;
2429 }
91327a77
AA
2430 marker_to_op[marker] = operation{key, is_olh};
2431 if (is_olh) {
2432 // prevent other olh ops from starting on this object name
2433 pending_olh.insert(key.name);
2434 }
7c673cae
FG
2435 return true;
2436 }
2437
91327a77
AA
2438 bool can_do_op(const rgw_obj_key& key, bool is_olh) {
2439 // serialize olh ops on the same object name
2440 if (is_olh && pending_olh.count(key.name)) {
2441 ldout(sync_env->cct, 20) << __func__ << "(): sync of " << key << " waiting for pending olh op" << dendl;
2442 return false;
2443 }
7c673cae
FG
2444 return (key_to_marker.find(key) == key_to_marker.end());
2445 }
91327a77
AA
2446
2447 RGWOrderCallCR *allocate_order_control_cr() {
2448 return new RGWLastCallerWinsCR(sync_env->cct);
2449 }
7c673cae
FG
2450};
2451
2452template <class T, class K>
2453class RGWBucketSyncSingleEntryCR : public RGWCoroutine {
2454 RGWDataSyncEnv *sync_env;
2455
2456 RGWBucketInfo *bucket_info;
2457 const rgw_bucket_shard& bs;
2458
2459 rgw_obj_key key;
2460 bool versioned;
91327a77 2461 boost::optional<uint64_t> versioned_epoch;
7c673cae
FG
2462 rgw_bucket_entry_owner owner;
2463 real_time timestamp;
2464 RGWModifyOp op;
2465 RGWPendingState op_state;
2466
2467 T entry_marker;
2468 RGWSyncShardMarkerTrack<T, K> *marker_tracker;
2469
2470 int sync_status;
2471
2472 stringstream error_ss;
2473
2474 RGWDataSyncDebugLogger logger;
2475
2476 bool error_injection;
2477
2478 RGWDataSyncModule *data_sync_module;
31f18b77
FG
2479
2480 rgw_zone_set zones_trace;
7c673cae
FG
2481
2482public:
2483 RGWBucketSyncSingleEntryCR(RGWDataSyncEnv *_sync_env,
2484 RGWBucketInfo *_bucket_info,
2485 const rgw_bucket_shard& bs,
91327a77
AA
2486 const rgw_obj_key& _key, bool _versioned,
2487 boost::optional<uint64_t> _versioned_epoch,
7c673cae
FG
2488 real_time& _timestamp,
2489 const rgw_bucket_entry_owner& _owner,
2490 RGWModifyOp _op, RGWPendingState _op_state,
31f18b77 2491 const T& _entry_marker, RGWSyncShardMarkerTrack<T, K> *_marker_tracker, rgw_zone_set& _zones_trace) : RGWCoroutine(_sync_env->cct),
7c673cae
FG
2492 sync_env(_sync_env),
2493 bucket_info(_bucket_info), bs(bs),
2494 key(_key), versioned(_versioned), versioned_epoch(_versioned_epoch),
2495 owner(_owner),
2496 timestamp(_timestamp), op(_op),
2497 op_state(_op_state),
2498 entry_marker(_entry_marker),
2499 marker_tracker(_marker_tracker),
31f18b77 2500 sync_status(0){
7c673cae 2501 stringstream ss;
91327a77 2502 ss << bucket_shard_str{bs} << "/" << key << "[" << versioned_epoch.value_or(0) << "]";
7c673cae
FG
2503 set_description() << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state;
2504 ldout(sync_env->cct, 20) << "bucket sync single entry (source_zone=" << sync_env->source_zone << ") b=" << ss.str() << " log_entry=" << entry_marker << " op=" << (int)op << " op_state=" << (int)op_state << dendl;
2505 set_status("init");
2506
2507 logger.init(sync_env, "Object", ss.str());
2508
2509 error_injection = (sync_env->cct->_conf->rgw_sync_data_inject_err_probability > 0);
2510
2511 data_sync_module = sync_env->sync_module->get_data_handler();
31f18b77
FG
2512
2513 zones_trace = _zones_trace;
2514 zones_trace.insert(sync_env->store->get_zone().id);
7c673cae
FG
2515 }
2516
2517 int operate() override {
2518 reenter(this) {
2519 /* skip entries that are not complete */
2520 if (op_state != CLS_RGW_STATE_COMPLETE) {
2521 goto done;
2522 }
2523 do {
2524 yield {
2525 marker_tracker->reset_need_retry(key);
2526 if (key.name.empty()) {
2527 /* shouldn't happen */
2528 set_status("skipping empty entry");
2529 ldout(sync_env->cct, 0) << "ERROR: " << __func__ << "(): entry with empty obj name, skipping" << dendl;
2530 goto done;
2531 }
2532 if (error_injection &&
2533 rand() % 10000 < cct->_conf->rgw_sync_data_inject_err_probability * 10000.0) {
2534 ldout(sync_env->cct, 0) << __func__ << ": injecting data sync error on key=" << key.name << dendl;
2535 retcode = -EIO;
2536 } else if (op == CLS_RGW_OP_ADD ||
2537 op == CLS_RGW_OP_LINK_OLH) {
7c673cae 2538 set_status("syncing obj");
91327a77 2539 ldout(sync_env->cct, 5) << "bucket sync: sync obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
7c673cae 2540 logger.log("fetch");
31f18b77 2541 call(data_sync_module->sync_object(sync_env, *bucket_info, key, versioned_epoch, &zones_trace));
7c673cae
FG
2542 } else if (op == CLS_RGW_OP_DEL || op == CLS_RGW_OP_UNLINK_INSTANCE) {
2543 set_status("removing obj");
2544 if (op == CLS_RGW_OP_UNLINK_INSTANCE) {
2545 versioned = true;
2546 }
2547 logger.log("remove");
91327a77 2548 call(data_sync_module->remove_object(sync_env, *bucket_info, key, timestamp, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae
FG
2549 } else if (op == CLS_RGW_OP_LINK_OLH_DM) {
2550 logger.log("creating delete marker");
2551 set_status("creating delete marker");
91327a77
AA
2552 ldout(sync_env->cct, 10) << "creating delete marker: obj: " << sync_env->source_zone << "/" << bucket_info->bucket << "/" << key << "[" << versioned_epoch.value_or(0) << "]" << dendl;
2553 call(data_sync_module->create_delete_marker(sync_env, *bucket_info, key, timestamp, owner, versioned, versioned_epoch.value_or(0), &zones_trace));
7c673cae
FG
2554 }
2555 }
2556 } while (marker_tracker->need_retry(key));
2557 {
2558 stringstream ss;
2559 if (retcode >= 0) {
2560 ss << "done";
2561 } else {
2562 ss << "done, retcode=" << retcode;
2563 }
2564 logger.log(ss.str());
2565 }
2566
2567 if (retcode < 0 && retcode != -ENOENT) {
2568 set_status() << "failed to sync obj; retcode=" << retcode;
2569 ldout(sync_env->cct, 0) << "ERROR: failed to sync object: "
2570 << bucket_shard_str{bs} << "/" << key.name << dendl;
2571 error_ss << bucket_shard_str{bs} << "/" << key.name;
2572 sync_status = retcode;
2573 }
2574 if (!error_ss.str().empty()) {
28e407b8 2575 yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status)));
7c673cae
FG
2576 }
2577done:
2578 if (sync_status == 0) {
2579 /* update marker */
2580 set_status() << "calling marker_tracker->finish(" << entry_marker << ")";
2581 yield call(marker_tracker->finish(entry_marker));
2582 sync_status = retcode;
2583 }
2584 if (sync_status < 0) {
2585 return set_cr_error(sync_status);
2586 }
2587 return set_cr_done();
2588 }
2589 return 0;
2590 }
2591};
2592
2593#define BUCKET_SYNC_SPAWN_WINDOW 20
2594
2595class RGWBucketShardFullSyncCR : public RGWCoroutine {
2596 RGWDataSyncEnv *sync_env;
2597 const rgw_bucket_shard& bs;
2598 RGWBucketInfo *bucket_info;
2599 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2600 bucket_list_result list_result;
2601 list<bucket_list_entry>::iterator entries_iter;
91327a77 2602 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2603 RGWBucketFullSyncShardMarkerTrack marker_tracker;
2604 rgw_obj_key list_marker;
2605 bucket_list_entry *entry{nullptr};
2606 RGWModifyOp op{CLS_RGW_OP_ADD};
2607
2608 int total_entries{0};
2609
2610 int sync_status{0};
2611
2612 const string& status_oid;
2613
2614 RGWDataSyncDebugLogger logger;
31f18b77 2615 rgw_zone_set zones_trace;
7c673cae
FG
2616public:
2617 RGWBucketShardFullSyncCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs,
2618 RGWBucketInfo *_bucket_info,
2619 const std::string& status_oid,
2620 RGWContinuousLeaseCR *lease_cr,
91327a77 2621 rgw_bucket_shard_sync_info& sync_info)
7c673cae 2622 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2623 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2624 marker_tracker(sync_env, status_oid, sync_info.full_marker),
7c673cae
FG
2625 status_oid(status_oid) {
2626 logger.init(sync_env, "BucketFull", bs.get_key());
31f18b77 2627 zones_trace.insert(sync_env->source_zone);
7c673cae
FG
2628 }
2629
2630 int operate() override;
2631};
2632
2633int RGWBucketShardFullSyncCR::operate()
2634{
2635 int ret;
2636 reenter(this) {
91327a77 2637 list_marker = sync_info.full_marker.position;
7c673cae 2638
91327a77 2639 total_entries = sync_info.full_marker.count;
7c673cae
FG
2640 do {
2641 if (!lease_cr->is_locked()) {
2642 drain_all();
2643 return set_cr_error(-ECANCELED);
2644 }
2645 set_status("listing remote bucket");
2646 ldout(sync_env->cct, 20) << __func__ << "(): listing bucket for full sync" << dendl;
2647 yield call(new RGWListBucketShardCR(sync_env, bs, list_marker,
2648 &list_result));
2649 if (retcode < 0 && retcode != -ENOENT) {
2650 set_status("failed bucket listing, going down");
2651 drain_all();
2652 return set_cr_error(retcode);
2653 }
2654 entries_iter = list_result.entries.begin();
2655 for (; entries_iter != list_result.entries.end(); ++entries_iter) {
2656 if (!lease_cr->is_locked()) {
2657 drain_all();
2658 return set_cr_error(-ECANCELED);
2659 }
2660 ldout(sync_env->cct, 20) << "[full sync] syncing object: "
2661 << bucket_shard_str{bs} << "/" << entries_iter->key << dendl;
2662 entry = &(*entries_iter);
2663 total_entries++;
2664 list_marker = entries_iter->key;
2665 if (!marker_tracker.start(entry->key, total_entries, real_time())) {
2666 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << entry->key << ". Duplicate entry?" << dendl;
2667 } else {
2668 op = (entry->key.instance.empty() || entry->key.instance == "null" ? CLS_RGW_OP_ADD : CLS_RGW_OP_LINK_OLH);
7c673cae
FG
2669 using SyncCR = RGWBucketSyncSingleEntryCR<rgw_obj_key, rgw_obj_key>;
2670 yield spawn(new SyncCR(sync_env, bucket_info, bs, entry->key,
2671 false, /* versioned, only matters for object removal */
2672 entry->versioned_epoch, entry->mtime,
2673 entry->owner, op, CLS_RGW_STATE_COMPLETE,
31f18b77 2674 entry->key, &marker_tracker, zones_trace),
7c673cae
FG
2675 false);
2676 }
2677 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2678 yield wait_for_child();
2679 bool again = true;
2680 while (again) {
2681 again = collect(&ret, nullptr);
2682 if (ret < 0) {
91327a77 2683 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
7c673cae
FG
2684 sync_status = ret;
2685 /* we have reported this error */
2686 }
2687 }
2688 }
2689 }
2690 } while (list_result.is_truncated && sync_status == 0);
2691 set_status("done iterating over all objects");
2692 /* wait for all operations to complete */
2693 while (num_spawned()) {
2694 yield wait_for_child();
2695 bool again = true;
2696 while (again) {
2697 again = collect(&ret, nullptr);
2698 if (ret < 0) {
91327a77 2699 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
7c673cae
FG
2700 sync_status = ret;
2701 /* we have reported this error */
2702 }
2703 }
2704 }
2705 if (!lease_cr->is_locked()) {
2706 return set_cr_error(-ECANCELED);
2707 }
2708 /* update sync state to incremental */
2709 if (sync_status == 0) {
2710 yield {
91327a77 2711 sync_info.state = rgw_bucket_shard_sync_info::StateIncrementalSync;
7c673cae 2712 map<string, bufferlist> attrs;
91327a77 2713 sync_info.encode_state_attr(attrs);
7c673cae
FG
2714 RGWRados *store = sync_env->store;
2715 call(new RGWSimpleRadosWriteAttrsCR(sync_env->async_rados, store,
2716 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
2717 attrs));
2718 }
2719 } else {
91327a77 2720 ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
7c673cae
FG
2721 }
2722 if (retcode < 0 && sync_status == 0) { /* actually tried to set incremental state and failed */
2723 ldout(sync_env->cct, 0) << "ERROR: failed to set sync state on bucket "
2724 << bucket_shard_str{bs} << " retcode=" << retcode << dendl;
2725 return set_cr_error(retcode);
2726 }
2727 if (sync_status < 0) {
2728 return set_cr_error(sync_status);
2729 }
2730 return set_cr_done();
2731 }
2732 return 0;
2733}
2734
91327a77
AA
2735static bool has_olh_epoch(RGWModifyOp op) {
2736 return op == CLS_RGW_OP_LINK_OLH || op == CLS_RGW_OP_UNLINK_INSTANCE;
2737}
2738
7c673cae
FG
2739class RGWBucketShardIncrementalSyncCR : public RGWCoroutine {
2740 RGWDataSyncEnv *sync_env;
2741 const rgw_bucket_shard& bs;
2742 RGWBucketInfo *bucket_info;
2743 boost::intrusive_ptr<RGWContinuousLeaseCR> lease_cr;
2744 list<rgw_bi_log_entry> list_result;
91327a77 2745 list<rgw_bi_log_entry>::iterator entries_iter, entries_end;
7c673cae 2746 map<pair<string, string>, pair<real_time, RGWModifyOp> > squash_map;
91327a77 2747 rgw_bucket_shard_sync_info& sync_info;
7c673cae
FG
2748 rgw_obj_key key;
2749 rgw_bi_log_entry *entry{nullptr};
2750 RGWBucketIncSyncShardMarkerTrack marker_tracker;
2751 bool updated_status{false};
2752 const string& status_oid;
31f18b77 2753 const string& zone_id;
7c673cae
FG
2754
2755 string cur_id;
2756
2757 RGWDataSyncDebugLogger logger;
2758
2759 int sync_status{0};
c07f9fc5 2760 bool syncstopped{false};
7c673cae
FG
2761
2762public:
2763 RGWBucketShardIncrementalSyncCR(RGWDataSyncEnv *_sync_env,
2764 const rgw_bucket_shard& bs,
2765 RGWBucketInfo *_bucket_info,
2766 const std::string& status_oid,
2767 RGWContinuousLeaseCR *lease_cr,
91327a77 2768 rgw_bucket_shard_sync_info& sync_info)
7c673cae 2769 : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), bs(bs),
91327a77
AA
2770 bucket_info(_bucket_info), lease_cr(lease_cr), sync_info(sync_info),
2771 marker_tracker(sync_env, status_oid, sync_info.inc_marker),
2772 status_oid(status_oid), zone_id(_sync_env->store->get_zone().id)
2773 {
7c673cae
FG
2774 set_description() << "bucket shard incremental sync bucket="
2775 << bucket_shard_str{bs};
2776 set_status("init");
2777 logger.init(sync_env, "BucketInc", bs.get_key());
2778 }
2779
2780 int operate() override;
2781};
2782
2783int RGWBucketShardIncrementalSyncCR::operate()
2784{
2785 int ret;
2786 reenter(this) {
2787 do {
2788 if (!lease_cr->is_locked()) {
2789 drain_all();
2790 return set_cr_error(-ECANCELED);
2791 }
91327a77
AA
2792 ldout(sync_env->cct, 20) << __func__ << "(): listing bilog for incremental sync" << sync_info.inc_marker.position << dendl;
2793 set_status() << "listing bilog; position=" << sync_info.inc_marker.position;
2794 yield call(new RGWListBucketIndexLogCR(sync_env, bs, sync_info.inc_marker.position,
7c673cae 2795 &list_result));
91327a77
AA
2796 if (retcode < 0 && retcode != -ENOENT) {
2797 /* wait for all operations to complete */
7c673cae 2798 drain_all();
91327a77 2799 return set_cr_error(retcode);
7c673cae
FG
2800 }
2801 squash_map.clear();
91327a77
AA
2802 entries_iter = list_result.begin();
2803 entries_end = list_result.end();
2804 for (; entries_iter != entries_end; ++entries_iter) {
2805 auto e = *entries_iter;
2806 if (e.op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP) {
2807 ldout(sync_env->cct, 20) << "syncstop on " << e.timestamp << dendl;
c07f9fc5 2808 syncstopped = true;
91327a77
AA
2809 entries_end = entries_iter; // dont sync past here
2810 break;
c07f9fc5 2811 }
91327a77 2812 if (e.op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
c07f9fc5
FG
2813 continue;
2814 }
94b18763
FG
2815 if (e.op == CLS_RGW_OP_CANCEL) {
2816 continue;
2817 }
7c673cae
FG
2818 if (e.state != CLS_RGW_STATE_COMPLETE) {
2819 continue;
2820 }
31f18b77
FG
2821 if (e.zones_trace.find(zone_id) != e.zones_trace.end()) {
2822 continue;
2823 }
7c673cae 2824 auto& squash_entry = squash_map[make_pair(e.object, e.instance)];
91327a77
AA
2825 // don't squash over olh entries - we need to apply their olh_epoch
2826 if (has_olh_epoch(squash_entry.second) && !has_olh_epoch(e.op)) {
2827 continue;
2828 }
7c673cae
FG
2829 if (squash_entry.first <= e.timestamp) {
2830 squash_entry = make_pair<>(e.timestamp, e.op);
2831 }
2832 }
c07f9fc5 2833
7c673cae 2834 entries_iter = list_result.begin();
91327a77 2835 for (; entries_iter != entries_end; ++entries_iter) {
7c673cae
FG
2836 if (!lease_cr->is_locked()) {
2837 drain_all();
2838 return set_cr_error(-ECANCELED);
2839 }
2840 entry = &(*entries_iter);
2841 {
2842 ssize_t p = entry->id.find('#'); /* entries might have explicit shard info in them, e.g., 6#00000000004.94.3 */
2843 if (p < 0) {
2844 cur_id = entry->id;
2845 } else {
2846 cur_id = entry->id.substr(p + 1);
2847 }
2848 }
91327a77 2849 sync_info.inc_marker.position = cur_id;
7c673cae 2850
c07f9fc5 2851 if (entry->op == RGWModifyOp::CLS_RGW_OP_SYNCSTOP || entry->op == RGWModifyOp::CLS_RGW_OP_RESYNC) {
91327a77 2852 ldout(sync_env->cct, 20) << "detected syncstop or resync on " << entries_iter->timestamp << ", skipping entry" << dendl;
c07f9fc5
FG
2853 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2854 continue;
2855 }
2856
7c673cae
FG
2857 if (!key.set(rgw_obj_index_key{entry->object, entry->instance})) {
2858 set_status() << "parse_raw_oid() on " << entry->object << " returned false, skipping entry";
2859 ldout(sync_env->cct, 20) << "parse_raw_oid() on " << entry->object << " returned false, skipping entry" << dendl;
2860 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2861 continue;
2862 }
2863
2864 ldout(sync_env->cct, 20) << "parsed entry: id=" << cur_id << " iter->object=" << entry->object << " iter->instance=" << entry->instance << " name=" << key.name << " instance=" << key.instance << " ns=" << key.ns << dendl;
2865
2866 if (!key.ns.empty()) {
2867 set_status() << "skipping entry in namespace: " << entry->object;
2868 ldout(sync_env->cct, 20) << "skipping entry in namespace: " << entry->object << dendl;
2869 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2870 continue;
2871 }
2872
2873 set_status() << "got entry.id=" << cur_id << " key=" << key << " op=" << (int)entry->op;
2874 if (entry->op == CLS_RGW_OP_CANCEL) {
2875 set_status() << "canceled operation, skipping";
2876 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2877 << bucket_shard_str{bs} << "/" << key << ": canceled operation" << dendl;
2878 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2879 continue;
2880 }
2881 if (entry->state != CLS_RGW_STATE_COMPLETE) {
2882 set_status() << "non-complete operation, skipping";
2883 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2884 << bucket_shard_str{bs} << "/" << key << ": non-complete operation" << dendl;
2885 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2886 continue;
2887 }
31f18b77
FG
2888 if (entry->zones_trace.find(zone_id) != entry->zones_trace.end()) {
2889 set_status() << "redundant operation, skipping";
2890 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2891 <<bucket_shard_str{bs} <<"/"<<key<<": redundant operation" << dendl;
2892 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2893 continue;
2894 }
7c673cae
FG
2895 if (make_pair<>(entry->timestamp, entry->op) != squash_map[make_pair(entry->object, entry->instance)]) {
2896 set_status() << "squashed operation, skipping";
2897 ldout(sync_env->cct, 20) << "[inc sync] skipping object: "
2898 << bucket_shard_str{bs} << "/" << key << ": squashed operation" << dendl;
91327a77 2899 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
7c673cae
FG
2900 continue;
2901 }
2902 ldout(sync_env->cct, 20) << "[inc sync] syncing object: "
2903 << bucket_shard_str{bs} << "/" << key << dendl;
2904 updated_status = false;
91327a77 2905 while (!marker_tracker.can_do_op(key, has_olh_epoch(entry->op))) {
7c673cae
FG
2906 if (!updated_status) {
2907 set_status() << "can't do op, conflicting inflight operation";
2908 updated_status = true;
2909 }
2910 ldout(sync_env->cct, 5) << *this << ": [inc sync] can't do op on key=" << key << " need to wait for conflicting operation to complete" << dendl;
2911 yield wait_for_child();
2912 bool again = true;
2913 while (again) {
2914 again = collect(&ret, nullptr);
2915 if (ret < 0) {
2916 ldout(sync_env->cct, 0) << "ERROR: a child operation returned error (ret=" << ret << ")" << dendl;
2917 sync_status = ret;
2918 /* we have reported this error */
2919 }
2920 }
1adf2230
AA
2921 if (sync_status != 0)
2922 break;
2923 }
2924 if (sync_status != 0) {
2925 /* get error, stop */
2926 break;
7c673cae 2927 }
91327a77 2928 if (!marker_tracker.index_key_to_marker(key, cur_id, has_olh_epoch(entry->op))) {
7c673cae
FG
2929 set_status() << "can't do op, sync already in progress for object";
2930 ldout(sync_env->cct, 20) << __func__ << ": skipping sync of entry: " << cur_id << ":" << key << " sync already in progress for object" << dendl;
2931 marker_tracker.try_update_high_marker(cur_id, 0, entry->timestamp);
2932 continue;
2933 }
2934 // yield {
2935 set_status() << "start object sync";
2936 if (!marker_tracker.start(cur_id, 0, entry->timestamp)) {
2937 ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << cur_id << ". Duplicate entry?" << dendl;
2938 } else {
91327a77 2939 boost::optional<uint64_t> versioned_epoch;
7c673cae
FG
2940 rgw_bucket_entry_owner owner(entry->owner, entry->owner_display_name);
2941 if (entry->ver.pool < 0) {
2942 versioned_epoch = entry->ver.epoch;
2943 }
2944 ldout(sync_env->cct, 20) << __func__ << "(): entry->timestamp=" << entry->timestamp << dendl;
2945 using SyncCR = RGWBucketSyncSingleEntryCR<string, rgw_obj_key>;
2946 spawn(new SyncCR(sync_env, bucket_info, bs, key,
2947 entry->is_versioned(), versioned_epoch,
2948 entry->timestamp, owner, entry->op, entry->state,
31f18b77 2949 cur_id, &marker_tracker, entry->zones_trace),
7c673cae
FG
2950 false);
2951 }
2952 // }
2953 while (num_spawned() > BUCKET_SYNC_SPAWN_WINDOW) {
2954 set_status() << "num_spawned() > spawn_window";
2955 yield wait_for_child();
2956 bool again = true;
2957 while (again) {
2958 again = collect(&ret, nullptr);
2959 if (ret < 0) {
91327a77 2960 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
7c673cae
FG
2961 sync_status = ret;
2962 /* we have reported this error */
2963 }
2964 /* not waiting for child here */
2965 }
2966 }
2967 }
91327a77 2968 } while (!list_result.empty() && sync_status == 0 && !syncstopped);
c07f9fc5 2969
7c673cae
FG
2970 while (num_spawned()) {
2971 yield wait_for_child();
2972 bool again = true;
2973 while (again) {
2974 again = collect(&ret, nullptr);
2975 if (ret < 0) {
91327a77 2976 ldout(sync_env->cct, 10) << "a sync operation returned error" << dendl;
7c673cae
FG
2977 sync_status = ret;
2978 /* we have reported this error */
2979 }
2980 /* not waiting for child here */
2981 }
2982 }
2983
91327a77
AA
2984 if (syncstopped) {
2985 // transition back to StateInit in RGWRunBucketSyncCoroutine. if sync is
2986 // still disabled, we'll delete the sync status object. otherwise we'll
2987 // restart full sync to catch any changes that happened while sync was
2988 // disabled
2989 sync_info.state = rgw_bucket_shard_sync_info::StateInit;
2990 return set_cr_done();
2991 }
2992
7c673cae
FG
2993 yield call(marker_tracker.flush());
2994 if (retcode < 0) {
2995 ldout(sync_env->cct, 0) << "ERROR: marker_tracker.flush() returned retcode=" << retcode << dendl;
2996 return set_cr_error(retcode);
2997 }
2998 if (sync_status < 0) {
91327a77 2999 ldout(sync_env->cct, 10) << "failure in sync, backing out (sync_status=" << sync_status<< ")" << dendl;
7c673cae
FG
3000 return set_cr_error(sync_status);
3001 }
7c673cae
FG
3002 return set_cr_done();
3003 }
3004 return 0;
3005}
3006
3007int RGWRunBucketSyncCoroutine::operate()
3008{
3009 reenter(this) {
3010 yield {
3011 set_status("acquiring sync lock");
3012 auto store = sync_env->store;
31f18b77
FG
3013 lease_cr.reset(new RGWContinuousLeaseCR(sync_env->async_rados, store,
3014 rgw_raw_obj(store->get_zone_params().log_pool, status_oid),
3015 "sync_lock",
3016 cct->_conf->rgw_sync_lease_period,
3017 this));
3018 lease_stack.reset(spawn(lease_cr.get(), false));
7c673cae
FG
3019 }
3020 while (!lease_cr->is_locked()) {
3021 if (lease_cr->is_done()) {
3022 ldout(cct, 5) << "lease cr failed, done early" << dendl;
3023 set_status("lease lock failed, early abort");
3024 return set_cr_error(lease_cr->get_ret_status());
3025 }
3026 set_sleeping(true);
3027 yield;
3028 }
3029
3030 yield call(new RGWReadBucketSyncStatusCoroutine(sync_env, bs, &sync_status));
3031 if (retcode < 0 && retcode != -ENOENT) {
3032 ldout(sync_env->cct, 0) << "ERROR: failed to read sync status for bucket="
3033 << bucket_shard_str{bs} << dendl;
3034 lease_cr->go_down();
3035 drain_all();
3036 return set_cr_error(retcode);
3037 }
3038
3039 ldout(sync_env->cct, 20) << __func__ << "(): sync status for bucket "
3040 << bucket_shard_str{bs} << ": " << sync_status.state << dendl;
3041
3042 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3043 if (retcode == -ENOENT) {
3044 /* bucket instance info has not been synced in yet, fetch it now */
3045 yield {
3046 ldout(sync_env->cct, 10) << "no local info for bucket "
3047 << bucket_str{bs.bucket} << ": fetching metadata" << dendl;
3048 string raw_key = string("bucket.instance:") + bs.bucket.get_key();
3049
3050 meta_sync_env.init(cct, sync_env->store, sync_env->store->rest_master_conn, sync_env->async_rados, sync_env->http_manager, sync_env->error_logger);
3051
3052 call(new RGWMetaSyncSingleEntryCR(&meta_sync_env, raw_key,
3053 string() /* no marker */,
3054 MDLOG_STATUS_COMPLETE,
3055 NULL /* no marker tracker */));
3056 }
3057 if (retcode < 0) {
3058 ldout(sync_env->cct, 0) << "ERROR: failed to fetch bucket instance info for " << bucket_str{bs.bucket} << dendl;
3059 lease_cr->go_down();
3060 drain_all();
3061 return set_cr_error(retcode);
3062 }
3063
3064 yield call(new RGWGetBucketInstanceInfoCR(sync_env->async_rados, sync_env->store, bs.bucket, &bucket_info));
3065 }
3066 if (retcode < 0) {
3067 ldout(sync_env->cct, 0) << "ERROR: failed to retrieve bucket info for bucket=" << bucket_str{bs.bucket} << dendl;
3068 lease_cr->go_down();
3069 drain_all();
3070 return set_cr_error(retcode);
3071 }
3072
91327a77
AA
3073 do {
3074 if (sync_status.state == rgw_bucket_shard_sync_info::StateInit) {
3075 yield call(new RGWInitBucketShardSyncStatusCoroutine(sync_env, bs, sync_status));
3076 if (retcode < 0) {
3077 ldout(sync_env->cct, 0) << "ERROR: init sync on " << bucket_shard_str{bs}
3078 << " failed, retcode=" << retcode << dendl;
3079 lease_cr->go_down();
3080 drain_all();
3081 return set_cr_error(retcode);
3082 }
7c673cae 3083 }
7c673cae 3084
91327a77
AA
3085 if (sync_status.state == rgw_bucket_shard_sync_info::StateFullSync) {
3086 yield call(new RGWBucketShardFullSyncCR(sync_env, bs, &bucket_info,
3087 status_oid, lease_cr.get(),
3088 sync_status));
3089 if (retcode < 0) {
3090 ldout(sync_env->cct, 5) << "full sync on " << bucket_shard_str{bs}
3091 << " failed, retcode=" << retcode << dendl;
3092 lease_cr->go_down();
3093 drain_all();
3094 return set_cr_error(retcode);
3095 }
7c673cae 3096 }
7c673cae 3097
91327a77
AA
3098 if (sync_status.state == rgw_bucket_shard_sync_info::StateIncrementalSync) {
3099 yield call(new RGWBucketShardIncrementalSyncCR(sync_env, bs, &bucket_info,
3100 status_oid, lease_cr.get(),
3101 sync_status));
3102 if (retcode < 0) {
3103 ldout(sync_env->cct, 5) << "incremental sync on " << bucket_shard_str{bs}
3104 << " failed, retcode=" << retcode << dendl;
3105 lease_cr->go_down();
3106 drain_all();
3107 return set_cr_error(retcode);
3108 }
7c673cae 3109 }
91327a77
AA
3110 // loop back to previous states unless incremental sync returns normally
3111 } while (sync_status.state != rgw_bucket_shard_sync_info::StateIncrementalSync);
7c673cae
FG
3112
3113 lease_cr->go_down();
3114 drain_all();
3115 return set_cr_done();
3116 }
3117
3118 return 0;
3119}
3120
3121RGWCoroutine *RGWRemoteBucketLog::run_sync_cr()
3122{
3123 return new RGWRunBucketSyncCoroutine(&sync_env, bs);
3124}
3125
3126int RGWBucketSyncStatusManager::init()
3127{
3128 conn = store->get_zone_conn_by_id(source_zone);
3129 if (!conn) {
3130 ldout(store->ctx(), 0) << "connection object to zone " << source_zone << " does not exist" << dendl;
3131 return -EINVAL;
3132 }
3133
3134 int ret = http_manager.set_threaded();
3135 if (ret < 0) {
3136 ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl;
3137 return ret;
3138 }
3139
3140
3141 const string key = bucket.get_key();
3142
3143 rgw_http_param_pair pairs[] = { { "key", key.c_str() },
3144 { NULL, NULL } };
3145
3146 string path = string("/admin/metadata/bucket.instance");
3147
3148 bucket_instance_meta_info result;
3149 ret = cr_mgr.run(new RGWReadRESTResourceCR<bucket_instance_meta_info>(store->ctx(), conn, &http_manager, path, pairs, &result));
3150 if (ret < 0) {
3151 ldout(store->ctx(), 0) << "ERROR: failed to fetch bucket metadata info from zone=" << source_zone << " path=" << path << " key=" << key << " ret=" << ret << dendl;
3152 return ret;
3153 }
3154
3155 RGWBucketInfo& bi = result.data.get_bucket_info();
3156 num_shards = bi.num_shards;
3157
3158 error_logger = new RGWSyncErrorLogger(store, RGW_SYNC_ERROR_LOG_SHARD_PREFIX, ERROR_LOGGER_SHARDS);
3159
3160 sync_module.reset(new RGWDefaultSyncModuleInstance());
3161
3162 int effective_num_shards = (num_shards ? num_shards : 1);
3163
3164 auto async_rados = store->get_async_rados();
3165
3166 for (int i = 0; i < effective_num_shards; i++) {
3167 RGWRemoteBucketLog *l = new RGWRemoteBucketLog(store, this, async_rados, &http_manager);
3168 ret = l->init(source_zone, conn, bucket, (num_shards ? i : -1), error_logger, sync_module);
3169 if (ret < 0) {
3170 ldout(store->ctx(), 0) << "ERROR: failed to initialize RGWRemoteBucketLog object" << dendl;
3171 return ret;
3172 }
3173 source_logs[i] = l;
3174 }
3175
3176 return 0;
3177}
3178
3179int RGWBucketSyncStatusManager::init_sync_status()
3180{
3181 list<RGWCoroutinesStack *> stacks;
3182
3183 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3184 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3185 RGWRemoteBucketLog *l = iter->second;
3186 stack->call(l->init_sync_status_cr());
3187
3188 stacks.push_back(stack);
3189 }
3190
3191 return cr_mgr.run(stacks);
3192}
3193
3194int RGWBucketSyncStatusManager::read_sync_status()
3195{
3196 list<RGWCoroutinesStack *> stacks;
3197
3198 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3199 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3200 RGWRemoteBucketLog *l = iter->second;
3201 stack->call(l->read_sync_status_cr(&sync_status[iter->first]));
3202
3203 stacks.push_back(stack);
3204 }
3205
3206 int ret = cr_mgr.run(stacks);
3207 if (ret < 0) {
3208 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3209 << bucket_str{bucket} << dendl;
3210 return ret;
3211 }
3212
3213 return 0;
3214}
3215
3216int RGWBucketSyncStatusManager::run()
3217{
3218 list<RGWCoroutinesStack *> stacks;
3219
3220 for (map<int, RGWRemoteBucketLog *>::iterator iter = source_logs.begin(); iter != source_logs.end(); ++iter) {
3221 RGWCoroutinesStack *stack = new RGWCoroutinesStack(store->ctx(), &cr_mgr);
3222 RGWRemoteBucketLog *l = iter->second;
3223 stack->call(l->run_sync_cr());
3224
3225 stacks.push_back(stack);
3226 }
3227
3228 int ret = cr_mgr.run(stacks);
3229 if (ret < 0) {
3230 ldout(store->ctx(), 0) << "ERROR: failed to read sync status for "
3231 << bucket_str{bucket} << dendl;
3232 return ret;
3233 }
3234
3235 return 0;
3236}
3237
3238string RGWBucketSyncStatusManager::status_oid(const string& source_zone,
3239 const rgw_bucket_shard& bs)
3240{
3241 return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key();
3242}
3243
b32b8144
FG
3244class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR {
3245 static constexpr int max_concurrent_shards = 16;
3246 RGWRados *const store;
3247 RGWDataSyncEnv *const env;
3248 const int num_shards;
3249 rgw_bucket_shard bs;
3250
3251 using Vector = std::vector<rgw_bucket_shard_sync_info>;
3252 Vector::iterator i, end;
3253
3254 public:
3255 RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env,
3256 int num_shards, const rgw_bucket& bucket,
3257 Vector *status)
3258 : RGWShardCollectCR(store->ctx(), max_concurrent_shards),
3259 store(store), env(env), num_shards(num_shards),
3260 bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1
3261 i(status->begin()), end(status->end())
3262 {}
3263
3264 bool spawn_next() override {
3265 if (i == end) {
3266 return false;
3267 }
3268 spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false);
3269 ++i;
3270 ++bs.shard_id;
3271 return true;
3272 }
3273};
3274
3275int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone,
28e407b8 3276 const RGWBucketInfo& bucket_info,
b32b8144
FG
3277 std::vector<rgw_bucket_shard_sync_info> *status)
3278{
28e407b8 3279 const auto num_shards = bucket_info.num_shards;
b32b8144 3280 status->clear();
28e407b8 3281 status->resize(std::max<size_t>(1, num_shards));
b32b8144
FG
3282
3283 RGWDataSyncEnv env;
3284 RGWSyncModuleInstanceRef module; // null sync module
3285 env.init(store->ctx(), store, nullptr, store->get_async_rados(),
91327a77 3286 nullptr, nullptr, source_zone, module);
b32b8144
FG
3287
3288 RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry());
28e407b8
AA
3289 return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards,
3290 bucket_info.bucket, status));
b32b8144
FG
3291}
3292
7c673cae
FG
3293
3294// TODO: move into rgw_data_sync_trim.cc
3295#undef dout_prefix
3296#define dout_prefix (*_dout << "data trim: ")
3297
3298namespace {
3299
3300/// return the marker that it's safe to trim up to
3301const std::string& get_stable_marker(const rgw_data_sync_marker& m)
3302{
3303 return m.state == m.FullSync ? m.next_step_marker : m.marker;
3304}
3305
3306/// comparison operator for take_min_markers()
3307bool operator<(const rgw_data_sync_marker& lhs,
3308 const rgw_data_sync_marker& rhs)
3309{
3310 // sort by stable marker
3311 return get_stable_marker(lhs) < get_stable_marker(rhs);
3312}
3313
3314/// populate the container starting with 'dest' with the minimum stable marker
3315/// of each shard for all of the peers in [first, last)
3316template <typename IterIn, typename IterOut>
3317void take_min_markers(IterIn first, IterIn last, IterOut dest)
3318{
3319 if (first == last) {
3320 return;
3321 }
3322 // initialize markers with the first peer's
3323 auto m = dest;
3324 for (auto &shard : first->sync_markers) {
3325 *m = std::move(shard.second);
3326 ++m;
3327 }
3328 // for remaining peers, replace with smaller markers
3329 for (auto p = first + 1; p != last; ++p) {
3330 m = dest;
3331 for (auto &shard : p->sync_markers) {
3332 if (shard.second < *m) {
3333 *m = std::move(shard.second);
3334 }
3335 ++m;
3336 }
3337 }
3338}
3339
3340} // anonymous namespace
3341
3342class DataLogTrimCR : public RGWCoroutine {
3343 RGWRados *store;
3344 RGWHTTPManager *http;
3345 const int num_shards;
3346 const std::string& zone_id; //< my zone id
3347 std::vector<rgw_data_sync_status> peer_status; //< sync status for each peer
3348 std::vector<rgw_data_sync_marker> min_shard_markers; //< min marker per shard
3349 std::vector<std::string>& last_trim; //< last trimmed marker per shard
3350 int ret{0};
3351
3352 public:
3353 DataLogTrimCR(RGWRados *store, RGWHTTPManager *http,
3354 int num_shards, std::vector<std::string>& last_trim)
3355 : RGWCoroutine(store->ctx()), store(store), http(http),
3356 num_shards(num_shards),
3357 zone_id(store->get_zone().id),
3358 peer_status(store->zone_conn_map.size()),
3359 min_shard_markers(num_shards),
3360 last_trim(last_trim)
3361 {}
3362
3363 int operate() override;
3364};
3365
3366int DataLogTrimCR::operate()
3367{
3368 reenter(this) {
3369 ldout(cct, 10) << "fetching sync status for zone " << zone_id << dendl;
3370 set_status("fetching sync status");
3371 yield {
3372 // query data sync status from each sync peer
3373 rgw_http_param_pair params[] = {
3374 { "type", "data" },
3375 { "status", nullptr },
3376 { "source-zone", zone_id.c_str() },
3377 { nullptr, nullptr }
3378 };
3379
3380 auto p = peer_status.begin();
3381 for (auto& c : store->zone_conn_map) {
3382 ldout(cct, 20) << "query sync status from " << c.first << dendl;
3383 using StatusCR = RGWReadRESTResourceCR<rgw_data_sync_status>;
3384 spawn(new StatusCR(cct, c.second, http, "/admin/log/", params, &*p),
3385 false);
3386 ++p;
3387 }
3388 }
3389
3390 // must get a successful reply from all peers to consider trimming
3391 ret = 0;
3392 while (ret == 0 && num_spawned() > 0) {
3393 yield wait_for_child();
3394 collect_next(&ret);
3395 }
3396 drain_all();
3397
3398 if (ret < 0) {
3399 ldout(cct, 4) << "failed to fetch sync status from all peers" << dendl;
3400 return set_cr_error(ret);
3401 }
3402
3403 ldout(cct, 10) << "trimming log shards" << dendl;
3404 set_status("trimming log shards");
3405 yield {
3406 // determine the minimum marker for each shard
3407 take_min_markers(peer_status.begin(), peer_status.end(),
3408 min_shard_markers.begin());
3409
3410 for (int i = 0; i < num_shards; i++) {
3411 const auto& m = min_shard_markers[i];
3412 auto& stable = get_stable_marker(m);
3413 if (stable <= last_trim[i]) {
3414 continue;
3415 }
3416 ldout(cct, 10) << "trimming log shard " << i
3417 << " at marker=" << stable
3418 << " last_trim=" << last_trim[i] << dendl;
3419 using TrimCR = RGWSyncLogTrimCR;
3420 spawn(new TrimCR(store, store->data_log->get_oid(i),
3421 stable, &last_trim[i]),
3422 true);
3423 }
3424 }
3425 return set_cr_done();
3426 }
3427 return 0;
3428}
3429
3430class DataLogTrimPollCR : public RGWCoroutine {
3431 RGWRados *store;
3432 RGWHTTPManager *http;
3433 const int num_shards;
3434 const utime_t interval; //< polling interval
3435 const std::string lock_oid; //< use first data log shard for lock
3436 const std::string lock_cookie;
3437 std::vector<std::string> last_trim; //< last trimmed marker per shard
3438
3439 public:
3440 DataLogTrimPollCR(RGWRados *store, RGWHTTPManager *http,
3441 int num_shards, utime_t interval)
3442 : RGWCoroutine(store->ctx()), store(store), http(http),
3443 num_shards(num_shards), interval(interval),
3444 lock_oid(store->data_log->get_oid(0)),
3445 lock_cookie(RGWSimpleRadosLockCR::gen_random_cookie(cct)),
3446 last_trim(num_shards)
3447 {}
3448
3449 int operate() override;
3450};
3451
3452int DataLogTrimPollCR::operate()
3453{
3454 reenter(this) {
3455 for (;;) {
3456 set_status("sleeping");
3457 wait(interval);
3458
3459 // request a 'data_trim' lock that covers the entire wait interval to
3460 // prevent other gateways from attempting to trim for the duration
3461 set_status("acquiring trim lock");
3462 yield call(new RGWSimpleRadosLockCR(store->get_async_rados(), store,
3463 rgw_raw_obj(store->get_zone_params().log_pool, lock_oid),
3464 "data_trim", lock_cookie,
3465 interval.sec()));
3466 if (retcode < 0) {
3467 // if the lock is already held, go back to sleep and try again later
3468 ldout(cct, 4) << "failed to lock " << lock_oid << ", trying again in "
3469 << interval.sec() << "s" << dendl;
3470 continue;
3471 }
3472
3473 set_status("trimming");
3474 yield call(new DataLogTrimCR(store, http, num_shards, last_trim));
3475
3476 // note that the lock is not released. this is intentional, as it avoids
3477 // duplicating this work in other gateways
3478 }
3479 }
3480 return 0;
3481}
3482
3483RGWCoroutine* create_data_log_trim_cr(RGWRados *store,
3484 RGWHTTPManager *http,
3485 int num_shards, utime_t interval)
3486{
3487 return new DataLogTrimPollCR(store, http, num_shards, interval);
3488}