X-Git-Url: https://git.proxmox.com/?a=blobdiff_plain;f=ceph%2Fsrc%2Frgw%2Frgw_data_sync.cc;h=703bdd7ee25e9cc0d700dd374595956f13a43ea9;hb=28e407b858acd3bddc89f68583571f771bb42e46;hp=8fe6497f29b61a0d2ddc3b3da13b5e351189a6d6;hpb=181888fb293938ba79f4c96c14bf1459f38d18af;p=ceph.git diff --git a/ceph/src/rgw/rgw_data_sync.cc b/ceph/src/rgw/rgw_data_sync.cc index 8fe6497f2..703bdd7ee 100644 --- a/ceph/src/rgw/rgw_data_sync.cc +++ b/ceph/src/rgw/rgw_data_sync.cc @@ -18,6 +18,7 @@ #include "rgw_bucket.h" #include "rgw_metadata.h" #include "rgw_sync_module.h" +#include "rgw_sync_log_trim.h" #include "cls/lock/cls_lock_client.h" @@ -151,6 +152,40 @@ bool RGWReadDataSyncStatusMarkersCR::spawn_next() return true; } +class RGWReadDataSyncRecoveringShardsCR : public RGWShardCollectCR { + static constexpr int MAX_CONCURRENT_SHARDS = 16; + + RGWDataSyncEnv *env; + + uint64_t max_entries; + int num_shards; + int shard_id{0};; + + string marker; + map> &entries_map; + + public: + RGWReadDataSyncRecoveringShardsCR(RGWDataSyncEnv *env, uint64_t _max_entries, int _num_shards, + map>& _entries_map) + : RGWShardCollectCR(env->cct, MAX_CONCURRENT_SHARDS), env(env), + max_entries(_max_entries), num_shards(_num_shards), entries_map(_entries_map) + {} + bool spawn_next() override; +}; + +bool RGWReadDataSyncRecoveringShardsCR::spawn_next() +{ + if (shard_id > num_shards) + return false; + + string error_oid = RGWDataSyncStatusManager::shard_obj_name(env->source_zone, shard_id) + ".retry"; + spawn(new RGWRadosGetOmapKeysCR(env->store, rgw_raw_obj(env->store->get_zone_params().log_pool, error_oid), + marker, &entries_map[shard_id], max_entries), false); + + ++shard_id; + return true; +} + class RGWReadDataSyncStatusCoroutine : public RGWCoroutine { RGWDataSyncEnv *sync_env; rgw_data_sync_status *sync_status; @@ -613,7 +648,8 @@ int RGWRemoteDataLog::read_source_log_shards_next(map shard_markers int RGWRemoteDataLog::init(const string& _source_zone, RGWRESTConn *_conn, RGWSyncErrorLogger *_error_logger, RGWSyncModuleInstanceRef& _sync_module) { - sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, _source_zone, _sync_module); + sync_env.init(store->ctx(), store, _conn, async_rados, &http_manager, _error_logger, + _source_zone, _sync_module, observer); if (initialized) { return 0; @@ -652,9 +688,39 @@ int RGWRemoteDataLog::read_sync_status(rgw_data_sync_status *sync_status) return ret; } +int RGWRemoteDataLog::read_recovering_shards(const int num_shards, set& recovering_shards) +{ + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.set_threaded() ret=" << ret << dendl; + return ret; + } + RGWDataSyncEnv sync_env_local = sync_env; + sync_env_local.http_manager = &http_manager; + map> entries_map; + uint64_t max_entries{1}; + ret = crs.run(new RGWReadDataSyncRecoveringShardsCR(&sync_env_local, max_entries, num_shards, entries_map)); + http_manager.stop(); + + if (ret == 0) { + for (const auto& entry : entries_map) { + if (entry.second.size() != 0) { + recovering_shards.insert(entry.first); + } + } + } + + return ret; +} + int RGWRemoteDataLog::init_sync_status(int num_shards) { rgw_data_sync_status sync_status; + sync_status.sync_info.num_shards = num_shards; + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); int ret = http_manager.set_threaded(); @@ -735,19 +801,20 @@ public: int operate() override { reenter(this) { - entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, - store->get_zone_params().log_pool, - oid_prefix); yield { string entrypoint = string("/admin/metadata/bucket.instance"); /* FIXME: need a better scaling solution here, requires streaming output */ call(new RGWReadRESTResourceCR >(store->ctx(), sync_env->conn, sync_env->http_manager, entrypoint, NULL, &result)); } - if (get_ret_status() < 0) { + if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: failed to fetch metadata for section bucket.index" << dendl; - return set_state(RGWCoroutine_Error); + return set_cr_error(retcode); } + entries_index = new RGWShardedOmapCRManager(sync_env->async_rados, store, this, num_shards, + store->get_zone_params().log_pool, + oid_prefix); + yield; // yield so OmapAppendCRs can start for (iter = result.begin(); iter != result.end(); ++iter) { ldout(sync_env->cct, 20) << "list metadata: section=bucket.index key=" << *iter << dendl; @@ -984,10 +1051,13 @@ public: } if (sync_status < 0) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key, - -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); - if (retcode < 0) { - ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl; + // write actual sync failures for 'radosgw-admin sync error list' + if (sync_status != -EBUSY && sync_status != -EAGAIN) { + yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", raw_key, + -sync_status, string("failed to sync bucket instance: ") + cpp_strerror(-sync_status))); + if (retcode < 0) { + ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure: retcode=" << retcode << dendl; + } } if (error_repo && !error_repo->append(raw_key)) { ldout(sync_env->store->ctx(), 0) << "ERROR: failed to log sync failure in error repo: retcode=" << retcode << dendl; @@ -1000,6 +1070,9 @@ public: << error_repo->get_obj() << " retcode=" << retcode << dendl; } } + if (sync_env->observer) { + sync_env->observer->on_bucket_changed(bs.bucket.get_key()); + } /* FIXME: what do do in case of error */ if (marker_tracker && !entry_marker.empty()) { /* update marker */ @@ -1034,8 +1107,8 @@ class RGWDataSyncShardCR : public RGWCoroutine { uint32_t shard_id; rgw_data_sync_marker sync_marker; - map entries; - map::iterator iter; + std::set entries; + std::set::iterator iter; string oid; @@ -1076,7 +1149,7 @@ class RGWDataSyncShardCR : public RGWCoroutine { string error_oid; RGWOmapAppend *error_repo; - map error_entries; + std::set error_entries; string error_marker; int max_error_entries; @@ -1194,20 +1267,20 @@ public: } iter = entries.begin(); for (; iter != entries.end(); ++iter) { - ldout(sync_env->cct, 20) << __func__ << ": full sync: " << iter->first << dendl; + ldout(sync_env->cct, 20) << __func__ << ": full sync: " << *iter << dendl; total_entries++; - if (!marker_tracker->start(iter->first, total_entries, real_time())) { - ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << iter->first << ". Duplicate entry?" << dendl; + if (!marker_tracker->start(*iter, total_entries, real_time())) { + ldout(sync_env->cct, 0) << "ERROR: cannot start syncing " << *iter << ". Duplicate entry?" << dendl; } else { // fetch remote and write locally - yield spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, marker_tracker, error_repo, false), false); + yield spawn(new RGWDataSyncSingleEntryCR(sync_env, *iter, *iter, marker_tracker, error_repo, false), false); if (retcode < 0) { lease_cr->go_down(); drain_all(); return set_cr_error(retcode); } } - sync_marker.marker = iter->first; + sync_marker.marker = *iter; } } while ((int)entries.size() == max_entries); @@ -1274,9 +1347,9 @@ public: ldout(sync_env->cct, 20) << __func__ << "(): read error repo, got " << error_entries.size() << " entries" << dendl; iter = error_entries.begin(); for (; iter != error_entries.end(); ++iter) { - ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << iter->first << dendl; - spawn(new RGWDataSyncSingleEntryCR(sync_env, iter->first, iter->first, nullptr /* no marker tracker */, error_repo, true), false); - error_marker = iter->first; + error_marker = *iter; + ldout(sync_env->cct, 20) << __func__ << "(): handle error entry: " << error_marker << dendl; + spawn(new RGWDataSyncSingleEntryCR(sync_env, error_marker, error_marker, nullptr /* no marker tracker */, error_repo, true), false); } if ((int)error_entries.size() != max_error_entries) { if (error_marker.empty() && error_entries.empty()) { @@ -1480,6 +1553,7 @@ public: if ((rgw_data_sync_info::SyncState)sync_status.sync_info.state == rgw_data_sync_info::StateBuildingFullSyncMaps) { /* call sync module init here */ + sync_status.sync_info.num_shards = num_shards; yield call(data_sync_module->init_sync(sync_env)); if (retcode < 0) { ldout(sync_env->cct, 0) << "ERROR: sync module init_sync() failed, retcode=" << retcode << dendl; @@ -1595,8 +1669,9 @@ class RGWDataSyncControlCR : public RGWBackoffControlCR RGWDataSyncEnv *sync_env; uint32_t num_shards; + static constexpr bool exit_on_error = false; // retry on all errors public: - RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, true), + RGWDataSyncControlCR(RGWDataSyncEnv *_sync_env, uint32_t _num_shards) : RGWBackoffControlCR(_sync_env->cct, exit_on_error), sync_env(_sync_env), num_shards(_num_shards) { } @@ -1670,7 +1745,9 @@ int RGWDataSyncStatusManager::init() RGWZoneParams& zone_params = store->get_zone_params(); - sync_module = store->get_sync_module(); + if (sync_module == nullptr) { + sync_module = store->get_sync_module(); + } conn = store->get_zone_conn_by_id(source_zone); if (!conn) { @@ -1736,35 +1813,22 @@ int RGWRemoteBucketLog::init(const string& _source_zone, RGWRESTConn *_conn, bs.bucket = bucket; bs.shard_id = shard_id; - sync_env.init(store->ctx(), store, conn, async_rados, http_manager, _error_logger, source_zone, _sync_module); + sync_env.init(store->ctx(), store, conn, async_rados, http_manager, + _error_logger, source_zone, _sync_module, nullptr); return 0; } -struct bucket_index_marker_info { - string bucket_ver; - string master_ver; - string max_marker; - bool syncstopped{false}; - - void decode_json(JSONObj *obj) { - JSONDecoder::decode_json("bucket_ver", bucket_ver, obj); - JSONDecoder::decode_json("master_ver", master_ver, obj); - JSONDecoder::decode_json("max_marker", max_marker, obj); - JSONDecoder::decode_json("syncstopped", syncstopped, obj); - } -}; - class RGWReadRemoteBucketIndexLogInfoCR : public RGWCoroutine { RGWDataSyncEnv *sync_env; const string instance_key; - bucket_index_marker_info *info; + rgw_bucket_index_marker_info *info; public: RGWReadRemoteBucketIndexLogInfoCR(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, - bucket_index_marker_info *_info) + rgw_bucket_index_marker_info *_info) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), instance_key(bs.get_key()), info(_info) {} @@ -1777,7 +1841,7 @@ public: { NULL, NULL } }; string p = "/admin/log/"; - call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info)); + call(new RGWReadRESTResourceCR(sync_env->cct, sync_env->conn, sync_env->http_manager, p, pairs, info)); } if (retcode < 0) { return set_cr_error(retcode); @@ -1796,7 +1860,7 @@ class RGWInitBucketShardSyncStatusCoroutine : public RGWCoroutine { rgw_bucket_shard_sync_info& status; - bucket_index_marker_info info; + rgw_bucket_index_marker_info info; public: RGWInitBucketShardSyncStatusCoroutine(RGWDataSyncEnv *_sync_env, const rgw_bucket_shard& bs, @@ -1920,6 +1984,171 @@ int RGWReadBucketSyncStatusCoroutine::operate() } return 0; } + +#define OMAP_READ_MAX_ENTRIES 10 +class RGWReadRecoveringBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRados *store; + + const int shard_id; + int max_entries; + + set& recovering_buckets; + string marker; + string error_oid; + + set error_entries; + int max_omap_entries; + int count; + +public: + RGWReadRecoveringBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + set& _recovering_buckets, const int _max_entries) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), + recovering_buckets(_recovering_buckets), max_omap_entries(OMAP_READ_MAX_ENTRIES) + { + error_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id) + ".retry"; + } + + int operate() override; +}; + +int RGWReadRecoveringBucketShardsCoroutine::operate() +{ + reenter(this){ + //read recovering bucket shards + count = 0; + do { + yield call(new RGWRadosGetOmapKeysCR(store, rgw_raw_obj(store->get_zone_params().log_pool, error_oid), + marker, &error_entries, max_omap_entries)); + + if (retcode == -ENOENT) { + break; + } + + if (retcode < 0) { + ldout(sync_env->cct, 0) << "failed to read recovering bucket shards with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (error_entries.empty()) { + break; + } + + count += error_entries.size(); + marker = *error_entries.rbegin(); + recovering_buckets.insert(error_entries.begin(), error_entries.end()); + }while((int)error_entries.size() == max_omap_entries && count < max_entries); + + return set_cr_done(); + } + + return 0; +} + +class RGWReadPendingBucketShardsCoroutine : public RGWCoroutine { + RGWDataSyncEnv *sync_env; + RGWRados *store; + + const int shard_id; + int max_entries; + + set& pending_buckets; + string marker; + string status_oid; + + rgw_data_sync_marker* sync_marker; + int count; + + list log_entries; + bool truncated; + +public: + RGWReadPendingBucketShardsCoroutine(RGWDataSyncEnv *_sync_env, const int _shard_id, + set& _pending_buckets, + rgw_data_sync_marker* _sync_marker, const int _max_entries) + : RGWCoroutine(_sync_env->cct), sync_env(_sync_env), + store(sync_env->store), shard_id(_shard_id), max_entries(_max_entries), + pending_buckets(_pending_buckets), sync_marker(_sync_marker) + { + status_oid = RGWDataSyncStatusManager::shard_obj_name(sync_env->source_zone, shard_id); + } + + int operate() override; +}; + +int RGWReadPendingBucketShardsCoroutine::operate() +{ + reenter(this){ + //read sync status marker + using CR = RGWSimpleRadosReadCR; + yield call(new CR(sync_env->async_rados, store, + rgw_raw_obj(store->get_zone_params().log_pool, status_oid), + sync_marker)); + if (retcode < 0) { + ldout(sync_env->cct,0) << "failed to read sync status marker with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + //read pending bucket shards + marker = sync_marker->marker; + count = 0; + do{ + yield call(new RGWReadRemoteDataLogShardCR(sync_env, shard_id, &marker, &log_entries, &truncated)); + + if (retcode == -ENOENT) { + break; + } + + if (retcode < 0) { + ldout(sync_env->cct,0) << "failed to read remote data log info with " + << cpp_strerror(retcode) << dendl; + return set_cr_error(retcode); + } + + if (log_entries.empty()) { + break; + } + + count += log_entries.size(); + for (const auto& entry : log_entries) { + pending_buckets.insert(entry.entry.key); + } + }while(truncated && count < max_entries); + + return set_cr_done(); + } + + return 0; +} + +int RGWRemoteDataLog::read_shard_status(int shard_id, set& pending_buckets, set& recovering_buckets, rgw_data_sync_marker *sync_marker, const int max_entries) +{ + // cannot run concurrently with run_sync(), so run in a separate manager + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + RGWHTTPManager http_manager(store->ctx(), crs.get_completion_mgr()); + int ret = http_manager.set_threaded(); + if (ret < 0) { + ldout(store->ctx(), 0) << "failed in http_manager.start() ret=" << ret << dendl; + return ret; + } + RGWDataSyncEnv sync_env_local = sync_env; + sync_env_local.http_manager = &http_manager; + list stacks; + RGWCoroutinesStack* recovering_stack = new RGWCoroutinesStack(store->ctx(), &crs); + recovering_stack->call(new RGWReadRecoveringBucketShardsCoroutine(&sync_env_local, shard_id, recovering_buckets, max_entries)); + stacks.push_back(recovering_stack); + RGWCoroutinesStack* pending_stack = new RGWCoroutinesStack(store->ctx(), &crs); + pending_stack->call(new RGWReadPendingBucketShardsCoroutine(&sync_env_local, shard_id, pending_buckets, sync_marker, max_entries)); + stacks.push_back(pending_stack); + ret = crs.run(stacks); + http_manager.stop(); + return ret; +} + RGWCoroutine *RGWRemoteBucketLog::read_sync_status_cr(rgw_bucket_shard_sync_info *sync_status) { return new RGWReadBucketSyncStatusCoroutine(&sync_env, bs, sync_status); @@ -2297,7 +2526,7 @@ public: sync_status = retcode; } if (!error_ss.str().empty()) { - yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, "failed to sync object")); + yield call(sync_env->error_logger->log_error_cr(sync_env->conn->get_remote_id(), "data", error_ss.str(), -retcode, string("failed to sync object") + cpp_strerror(-sync_status))); } done: if (sync_status == 0) { @@ -2538,6 +2767,9 @@ int RGWBucketShardIncrementalSyncCR::operate() syncstopped = false; continue; } + if (e.op == CLS_RGW_OP_CANCEL) { + continue; + } if (e.state != CLS_RGW_STATE_COMPLETE) { continue; } @@ -2962,6 +3194,55 @@ string RGWBucketSyncStatusManager::status_oid(const string& source_zone, return bucket_status_oid_prefix + "." + source_zone + ":" + bs.get_key(); } +class RGWCollectBucketSyncStatusCR : public RGWShardCollectCR { + static constexpr int max_concurrent_shards = 16; + RGWRados *const store; + RGWDataSyncEnv *const env; + const int num_shards; + rgw_bucket_shard bs; + + using Vector = std::vector; + Vector::iterator i, end; + + public: + RGWCollectBucketSyncStatusCR(RGWRados *store, RGWDataSyncEnv *env, + int num_shards, const rgw_bucket& bucket, + Vector *status) + : RGWShardCollectCR(store->ctx(), max_concurrent_shards), + store(store), env(env), num_shards(num_shards), + bs(bucket, num_shards > 0 ? 0 : -1), // start at shard 0 or -1 + i(status->begin()), end(status->end()) + {} + + bool spawn_next() override { + if (i == end) { + return false; + } + spawn(new RGWReadBucketSyncStatusCoroutine(env, bs, &*i), false); + ++i; + ++bs.shard_id; + return true; + } +}; + +int rgw_bucket_sync_status(RGWRados *store, const std::string& source_zone, + const RGWBucketInfo& bucket_info, + std::vector *status) +{ + const auto num_shards = bucket_info.num_shards; + status->clear(); + status->resize(std::max(1, num_shards)); + + RGWDataSyncEnv env; + RGWSyncModuleInstanceRef module; // null sync module + env.init(store->ctx(), store, nullptr, store->get_async_rados(), + nullptr, nullptr, source_zone, module, nullptr); + + RGWCoroutinesManager crs(store->ctx(), store->get_cr_registry()); + return crs.run(new RGWCollectBucketSyncStatusCR(store, &env, num_shards, + bucket_info.bucket, status)); +} + // TODO: move into rgw_data_sync_trim.cc #undef dout_prefix