// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
#include "rgw_b64.h"
#include "rgw_common.h"
using ESVersion = std::pair<int,int>;
static constexpr ESVersion ES_V5{5,0};
+static constexpr ESVersion ES_V7{7,0};
struct ESInfo {
std::string name;
uint32_t num_shards{0};
uint32_t num_replicas{0};
std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
+ ESInfo es_info;
void init(CephContext *cct, const JSONFormattable& config) {
string elastic_endpoint = config["endpoint"];
}
string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
- return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+ if (es_info.version >= ES_V7) {
+ return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+;
+ } else {
+ return index_path + "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+ }
}
bool should_handle_operation(RGWBucketInfo& bucket_info) {
template <class T>
struct es_index_mappings {
+ ESVersion es_version;
ESType string_type {ESType::String};
+ es_index_mappings(ESVersion esv):es_version(esv) {
+ }
+
es_type<T> est(ESType t) const {
return es_type<T>(t);
}
}
void dump(Formatter *f) const {
- f->open_object_section("object");
+ if (es_version <= ES_V7)
+ f->open_object_section("object");
f->open_object_section("properties");
encode_json("bucket", est(string_type), f);
encode_json("name", est(string_type), f);
f->close_section(); // properties
f->close_section(); // meta
f->close_section(); // properties
+
+ if (es_version <= ES_V7)
f->close_section(); // object
}
};
es_index_settings settings;
es_index_mappings<T> mappings;
- es_index_config(es_index_settings& _s) : settings(_s) {}
+ es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
+ }
void dump(Formatter *f) const {
encode_json("settings", settings, f);
}
};
-class RGWElasticInitConfigCBCR : public RGWCoroutine {
- RGWDataSyncEnv *sync_env;
- ElasticConfigRef conf;
- ESInfo es_info;
-
- struct _err_response {
- struct err_reason {
- vector<err_reason> root_cause;
- string type;
- string reason;
- string index;
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("root_cause", root_cause, obj);
- JSONDecoder::decode_json("type", type, obj);
- JSONDecoder::decode_json("reason", reason, obj);
- JSONDecoder::decode_json("index", index, obj);
- }
- } error;
-
- void decode_json(JSONObj *obj) {
- JSONDecoder::decode_json("error", error, obj);
- }
- } err_response;
-
+class RGWElasticGetESInfoCBCR : public RGWCoroutine {
public:
- RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
- ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
- sync_env(_sync_env),
+ RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc,
+ ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env),
conf(_conf) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
+ ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl;
yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
conf->conn.get(),
sync_env->http_manager,
"/", nullptr /*params*/,
&(conf->default_headers),
- &es_info));
+ &(conf->es_info)));
if (retcode < 0) {
+ ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
return set_cr_error(retcode);
}
+ ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+ return set_cr_done();
+ }
+ return 0;
+ }
+private:
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+};
+
+class RGWElasticPutIndexCBCR : public RGWCoroutine {
+public:
+ RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc,
+ ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env),
+ conf(_conf) {}
+ int operate() override {
+ reenter(this) {
+ ldout(sc->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl;
+
yield {
string path = conf->get_index_path();
- ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
-
es_index_settings settings(conf->num_replicas, conf->num_shards);
-
std::unique_ptr<es_index_config_base> index_conf;
- if (es_info.version >= ES_V5) {
- ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v5>(settings));
+ if (conf->es_info.version >= ES_V5) {
+ ldout(sc->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
+ index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
} else {
- ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
- index_conf.reset(new es_index_config<es_type_v2>(settings));
+ ldout(sc->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
+ index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
}
- call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
+ call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sc->cct,
conf->conn.get(),
sync_env->http_manager,
path, nullptr /*params*/,
*index_conf, nullptr, &err_response));
}
if (retcode < 0) {
- ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
- if (err_response.error.type != "index_already_exists_exception") {
+ if (err_response.error.type != "index_already_exists_exception" &&
+ err_response.error.type != "resource_already_exists_exception") {
+ ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
return set_cr_error(retcode);
}
return 0;
}
+private:
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+
+ struct _err_response {
+ struct err_reason {
+ vector<err_reason> root_cause;
+ string type;
+ string reason;
+ string index;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("root_cause", root_cause, obj);
+ JSONDecoder::decode_json("type", type, obj);
+ JSONDecoder::decode_json("reason", reason, obj);
+ JSONDecoder::decode_json("index", index, obj);
+ }
+ } error;
+
+ void decode_json(JSONObj *obj) {
+ JSONDecoder::decode_json("error", error, obj);
+ }
+ } err_response;
+};
+
+class RGWElasticInitConfigCBCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
+ RGWDataSyncEnv *sync_env;
+ ElasticConfigRef conf;
+
+public:
+ RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc,
+ ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+ sc(_sc), sync_env(_sc->env),
+ conf(_conf) {}
+ int operate() override {
+ reenter(this) {
+
+ yield call(new RGWElasticGetESInfoCBCR(sc, conf));
+
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+
+ yield call(new RGWElasticPutIndexCBCR(sc, conf));
+ if (retcode < 0) {
+ return set_cr_error(retcode);
+ }
+ return set_cr_done();
+ }
+ return 0;
+ }
+
};
class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+ rgw_bucket_sync_pipe sync_pipe;
ElasticConfigRef conf;
uint64_t versioned_epoch;
public:
- RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+ RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe), conf(_conf),
versioned_epoch(_versioned_epoch) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key
+ ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone
+ << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
<< " size=" << size << " mtime=" << mtime << dendl;
yield {
- string path = conf->get_obj_path(bucket_info, key);
- es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+ string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
+ es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
};
class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+ rgw_bucket_sync_pipe sync_pipe;
ElasticConfigRef conf;
uint64_t versioned_epoch;
public:
- RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
- ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+ RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+ ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+ sync_pipe(_sync_pipe),
conf(_conf), versioned_epoch(_versioned_epoch) {
}
~RGWElasticHandleRemoteObjCR() override {}
RGWStatRemoteObjCBCR *allocate_callback() override {
- return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+ return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch);
}
};
class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
+ RGWDataSyncCtx *sc;
RGWDataSyncEnv *sync_env;
- RGWBucketInfo bucket_info;
+ rgw_bucket_sync_pipe sync_pipe;
rgw_obj_key key;
ceph::real_time mtime;
ElasticConfigRef conf;
public:
- RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
- RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
- ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
- bucket_info(_bucket_info), key(_key),
+ RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
+ rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
+ ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+ sync_pipe(_sync_pipe), key(_key),
mtime(_mtime), conf(_conf) {}
int operate() override {
reenter(this) {
- ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
- << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+ ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone
+ << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
yield {
- string path = conf->get_obj_path(bucket_info, key);
+ string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
sync_env->http_manager,
}
~RGWElasticDataSyncModule() override {}
- void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
- conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id);
+ void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
+ conf->init_instance(sc->env->svc->zone->get_realm(), instance_id);
}
- RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
- ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
- return new RGWElasticInitConfigCBCR(sync_env, conf);
+ RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override {
+ ldout(sc->cct, 5) << conf->id << ": init" << dendl;
+ return new RGWElasticInitConfigCBCR(sc, conf);
}
- RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
- if (!conf->should_handle_operation(bucket_info)) {
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+
+ RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
+ ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl;
+ // try to get elastic search version
+ return new RGWElasticGetESInfoCBCR(sc, conf);
+ }
+
+ RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+ ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+ if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
+ ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
- return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
+ return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0));
}
- RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+ RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
/* versioned and versioned epoch params are useless in the elasticsearch backend case */
- ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- if (!conf->should_handle_operation(bucket_info)) {
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+ ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+ if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
+ ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
return nullptr;
}
- return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+ return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf);
}
- RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+ RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
- ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+ ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
<< " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
- ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
+ ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
return NULL;
}
RGWRESTConn *get_rest_conn() {