]> git.proxmox.com Git - ceph.git/blobdiff - ceph/src/rgw/rgw_sync_module_es.cc
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
index f9b02a4027bbfa917ef413f3ca25600f1a90515d..d7e5dca28ffbc6b58fbab9d2afff598a70d270eb 100644 (file)
@@ -1,5 +1,5 @@
 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
-// vim: ts=8 sw=2 smarttab
+// vim: ts=8 sw=2 smarttab ft=cpp
 
 #include "rgw_b64.h"
 #include "rgw_common.h"
@@ -112,6 +112,7 @@ public:
 
 using ESVersion = std::pair<int,int>;
 static constexpr ESVersion ES_V5{5,0};
+static constexpr ESVersion ES_V7{7,0};
 
 struct ESInfo {
   std::string name;
@@ -171,6 +172,7 @@ struct ElasticConfig {
   uint32_t num_shards{0};
   uint32_t num_replicas{0};
   std::map <string,string> default_headers = {{ "Content-Type", "application/json" }};
+  ESInfo es_info;
 
   void init(CephContext *cct, const JSONFormattable& config) {
     string elastic_endpoint = config["endpoint"];
@@ -216,7 +218,12 @@ struct ElasticConfig {
   }
 
   string get_obj_path(const RGWBucketInfo& bucket_info, const rgw_obj_key& key) {
-    return index_path +  "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+    if (es_info.version >= ES_V7) {
+      return index_path+ "/_doc/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+;
+    } else {
+      return index_path +  "/object/" + url_encode(bucket_info.bucket.bucket_id + ":" + key.name + ":" + (key.instance.empty() ? "null" : key.instance));
+    }
   }
 
   bool should_handle_operation(RGWBucketInfo& bucket_info) {
@@ -328,8 +335,12 @@ struct es_type : public T {
 
 template <class T>
 struct es_index_mappings {
+  ESVersion es_version;
   ESType string_type {ESType::String};
 
+  es_index_mappings(ESVersion esv):es_version(esv) {
+  }
+
   es_type<T> est(ESType t) const {
     return es_type<T>(t);
   }
@@ -345,7 +356,8 @@ struct es_index_mappings {
   }
 
   void dump(Formatter *f) const {
-    f->open_object_section("object");
+    if (es_version <= ES_V7)
+      f->open_object_section("object");
     f->open_object_section("properties");
     encode_json("bucket", est(string_type), f);
     encode_json("name", est(string_type), f);
@@ -370,6 +382,8 @@ struct es_index_mappings {
     f->close_section(); // properties
     f->close_section(); // meta
     f->close_section(); // properties
+
+    if (es_version <= ES_V7)
     f->close_section(); // object
   }
 };
@@ -396,7 +410,8 @@ struct es_index_config : public es_index_config_base {
   es_index_settings settings;
   es_index_mappings<T> mappings;
 
-  es_index_config(es_index_settings& _s) : settings(_s) {}
+  es_index_config(es_index_settings& _s, ESVersion esv) : settings(_s), mappings(esv) {
+  }
 
   void dump(Formatter *f) const {
     encode_json("settings", settings, f);
@@ -626,65 +641,60 @@ struct es_obj_metadata {
   }
 };
 
-class RGWElasticInitConfigCBCR : public RGWCoroutine {
-  RGWDataSyncEnv *sync_env;
-  ElasticConfigRef conf;
-  ESInfo es_info;
-
-  struct _err_response {
-    struct err_reason {
-      vector<err_reason> root_cause;
-      string type;
-      string reason;
-      string index;
-
-      void decode_json(JSONObj *obj) {
-        JSONDecoder::decode_json("root_cause", root_cause, obj);
-        JSONDecoder::decode_json("type", type, obj);
-        JSONDecoder::decode_json("reason", reason, obj);
-        JSONDecoder::decode_json("index", index, obj);
-      }
-    } error;
-
-    void decode_json(JSONObj *obj) {
-      JSONDecoder::decode_json("error", error, obj);
-    }
-  } err_response;
-
+class RGWElasticGetESInfoCBCR : public RGWCoroutine {
 public:
-  RGWElasticInitConfigCBCR(RGWDataSyncEnv *_sync_env,
-                          ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct),
-                                                    sync_env(_sync_env),
+  RGWElasticGetESInfoCBCR(RGWDataSyncCtx *_sc, 
+                          ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+                                                    sc(_sc), sync_env(_sc->env),
                                                     conf(_conf) {}
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 0) << ": init elasticsearch config zone=" << sync_env->source_zone << dendl;
+      ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch info for zone: " << sc->source_zone << dendl;
       yield call(new RGWReadRESTResourceCR<ESInfo> (sync_env->cct,
                                                     conf->conn.get(),
                                                     sync_env->http_manager,
                                                     "/", nullptr /*params*/,
                                                     &(conf->default_headers),
-                                                    &es_info));
+                                                    &(conf->es_info)));
       if (retcode < 0) {
+        ldout(sync_env->cct, 5) << conf->id << ": get elasticsearch failed: " << retcode << dendl;
         return set_cr_error(retcode);
       }
 
+      ldout(sync_env->cct, 5) << conf->id << ": got elastic version=" << conf->es_info.get_version_str() << dendl;
+      return set_cr_done();
+    }
+    return 0;
+  }
+private:
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+};
+
+class RGWElasticPutIndexCBCR : public RGWCoroutine {
+public:
+  RGWElasticPutIndexCBCR(RGWDataSyncCtx *_sc,
+                         ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+                                                   sc(_sc), sync_env(_sc->env),
+                                                   conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+      ldout(sc->cct, 5) << conf->id << ": put elasticsearch index for zone: " << sc->source_zone << dendl;
+
       yield {
         string path = conf->get_index_path();
-        ldout(sync_env->cct, 5) << "got elastic version=" << es_info.get_version_str() << dendl;
-
         es_index_settings settings(conf->num_replicas, conf->num_shards);
-
         std::unique_ptr<es_index_config_base> index_conf;
 
-        if (es_info.version >= ES_V5) {
-          ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v5>(settings));
+        if (conf->es_info.version >= ES_V5) {
+          ldout(sc->cct, 0) << "elasticsearch: index mapping: version >= 5" << dendl;
+          index_conf.reset(new es_index_config<es_type_v5>(settings, conf->es_info.version));
         } else {
-          ldout(sync_env->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
-          index_conf.reset(new es_index_config<es_type_v2>(settings));
+          ldout(sc->cct, 0) << "elasticsearch: index mapping: version < 5" << dendl;
+          index_conf.reset(new es_index_config<es_type_v2>(settings, conf->es_info.version));
         }
-        call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sync_env->cct,
+        call(new RGWPutRESTResourceCR<es_index_config_base, int, _err_response> (sc->cct,
                                                              conf->conn.get(),
                                                              sync_env->http_manager,
                                                              path, nullptr /*params*/,
@@ -692,9 +702,10 @@ public:
                                                              *index_conf, nullptr, &err_response));
       }
       if (retcode < 0) {
-        ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
 
-        if (err_response.error.type != "index_already_exists_exception") {
+        if (err_response.error.type != "index_already_exists_exception" &&
+                 err_response.error.type != "resource_already_exists_exception") {
+          ldout(sync_env->cct, 0) << "elasticsearch: failed to initialize index: response.type=" << err_response.error.type << " response.reason=" << err_response.error.reason << dendl;
           return set_cr_error(retcode);
         }
 
@@ -705,25 +716,81 @@ public:
     return 0;
   }
 
+private:
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+
+    struct _err_response {
+    struct err_reason {
+      vector<err_reason> root_cause;
+      string type;
+      string reason;
+      string index;
+
+      void decode_json(JSONObj *obj) {
+        JSONDecoder::decode_json("root_cause", root_cause, obj);
+        JSONDecoder::decode_json("type", type, obj);
+        JSONDecoder::decode_json("reason", reason, obj);
+        JSONDecoder::decode_json("index", index, obj);
+      }
+    } error;
+
+    void decode_json(JSONObj *obj) {
+      JSONDecoder::decode_json("error", error, obj);
+    }
+  } err_response;
+};
+
+class RGWElasticInitConfigCBCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
+  RGWDataSyncEnv *sync_env;
+  ElasticConfigRef conf;
+
+public:
+  RGWElasticInitConfigCBCR(RGWDataSyncCtx *_sc,
+                          ElasticConfigRef _conf) : RGWCoroutine(_sc->cct),
+                                                    sc(_sc), sync_env(_sc->env),
+                                                    conf(_conf) {}
+  int operate() override {
+    reenter(this) {
+
+      yield call(new RGWElasticGetESInfoCBCR(sc, conf));
+
+      if (retcode < 0) {
+        return set_cr_error(retcode);
+      }
+
+      yield call(new RGWElasticPutIndexCBCR(sc, conf));
+      if (retcode < 0) {
+          return set_cr_error(retcode);
+      }
+      return set_cr_done();
+    }
+    return 0;
+  }
+
 };
 
 class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
+  rgw_bucket_sync_pipe sync_pipe;
   ElasticConfigRef conf;
   uint64_t versioned_epoch;
 public:
-  RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                          ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf),
+  RGWElasticHandleRemoteObjCBCR(RGWDataSyncCtx *_sc,
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+                          ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWStatRemoteObjCBCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+                                                                               sync_pipe(_sync_pipe), conf(_conf),
                                                                                versioned_epoch(_versioned_epoch) {}
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key
+      ldout(sync_env->cct, 10) << ": stat of remote obj: z=" << sc->source_zone
+                               << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key
                                << " size=" << size << " mtime=" << mtime << dendl;
 
       yield {
-        string path = conf->get_obj_path(bucket_info, key);
-        es_obj_metadata doc(sync_env->cct, conf, bucket_info, key, mtime, size, attrs, versioned_epoch);
+        string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
+        es_obj_metadata doc(sync_env->cct, conf, sync_pipe.dest_bucket_info, key, mtime, size, attrs, versioned_epoch);
 
         call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf->conn.get(),
                                                             sync_env->http_manager,
@@ -742,40 +809,43 @@ public:
 };
 
 class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
+  rgw_bucket_sync_pipe sync_pipe;
   ElasticConfigRef conf;
   uint64_t versioned_epoch;
 public:
-  RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
-                        RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
-                        ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
+  RGWElasticHandleRemoteObjCR(RGWDataSyncCtx *_sc,
+                        rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key,
+                        ElasticConfigRef _conf, uint64_t _versioned_epoch) : RGWCallStatRemoteObjCR(_sc, _sync_pipe.info.source_bs.bucket, _key),
+                                                           sync_pipe(_sync_pipe),
                                                            conf(_conf), versioned_epoch(_versioned_epoch) {
   }
 
   ~RGWElasticHandleRemoteObjCR() override {}
 
   RGWStatRemoteObjCBCR *allocate_callback() override {
-    return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf, versioned_epoch);
+    return new RGWElasticHandleRemoteObjCBCR(sc, sync_pipe, key, conf, versioned_epoch);
   }
 };
 
 class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
+  RGWDataSyncCtx *sc;
   RGWDataSyncEnv *sync_env;
-  RGWBucketInfo bucket_info;
+  rgw_bucket_sync_pipe sync_pipe;
   rgw_obj_key key;
   ceph::real_time mtime;
   ElasticConfigRef conf;
 public:
-  RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
-                          RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
-                          ElasticConfigRef _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
-                                                        bucket_info(_bucket_info), key(_key),
+  RGWElasticRemoveRemoteObjCBCR(RGWDataSyncCtx *_sc,
+                          rgw_bucket_sync_pipe& _sync_pipe, rgw_obj_key& _key, const ceph::real_time& _mtime,
+                          ElasticConfigRef _conf) : RGWCoroutine(_sc->cct), sc(_sc), sync_env(_sc->env),
+                                                        sync_pipe(_sync_pipe), key(_key),
                                                         mtime(_mtime), conf(_conf) {}
   int operate() override {
     reenter(this) {
-      ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sync_env->source_zone
-                               << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
+      ldout(sync_env->cct, 10) << ": remove remote obj: z=" << sc->source_zone
+                               << " b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << dendl;
       yield {
-        string path = conf->get_obj_path(bucket_info, key);
+        string path = conf->get_obj_path(sync_pipe.dest_bucket_info, key);
 
         call(new RGWDeleteRESTResourceCR(sync_env->cct, conf->conn.get(),
                                          sync_env->http_manager,
@@ -799,36 +869,43 @@ public:
   }
   ~RGWElasticDataSyncModule() override {}
 
-  void init(RGWDataSyncEnv *sync_env, uint64_t instance_id) override {
-    conf->init_instance(sync_env->store->svc.zone->get_realm(), instance_id);
+  void init(RGWDataSyncCtx *sc, uint64_t instance_id) override {
+    conf->init_instance(sc->env->svc->zone->get_realm(), instance_id);
   }
 
-  RGWCoroutine *init_sync(RGWDataSyncEnv *sync_env) override {
-    ldout(sync_env->cct, 5) << conf->id << ": init" << dendl;
-    return new RGWElasticInitConfigCBCR(sync_env, conf);
+  RGWCoroutine *init_sync(RGWDataSyncCtx *sc) override {
+    ldout(sc->cct, 5) << conf->id << ": init" << dendl;
+    return new RGWElasticInitConfigCBCR(sc, conf);
   }
-  RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
-    if (!conf->should_handle_operation(bucket_info)) {
-      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+
+  RGWCoroutine *start_sync(RGWDataSyncCtx *sc) override {
+    ldout(sc->cct, 5) << conf->id << ": start_sync" << dendl;
+    // try to get elastic search version
+    return new RGWElasticGetESInfoCBCR(sc, conf);
+  }
+
+  RGWCoroutine *sync_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, std::optional<uint64_t> versioned_epoch, rgw_zone_set *zones_trace) override {
+    ldout(sc->cct, 10) << conf->id << ": sync_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch.value_or(0) << dendl;
+    if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
+      ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
       return nullptr;
     }
-    return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf, versioned_epoch.value_or(0));
+    return new RGWElasticHandleRemoteObjCR(sc, sync_pipe, key, conf, versioned_epoch.value_or(0));
   }
-  RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
+  RGWCoroutine *remove_object(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
     /* versioned and versioned epoch params are useless in the elasticsearch backend case */
-    ldout(sync_env->cct, 10) << conf->id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    if (!conf->should_handle_operation(bucket_info)) {
-      ldout(sync_env->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
+    ldout(sc->cct, 10) << conf->id << ": rm_object: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
+    if (!conf->should_handle_operation(sync_pipe.dest_bucket_info)) {
+      ldout(sc->cct, 10) << conf->id << ": skipping operation (bucket not approved)" << dendl;
       return nullptr;
     }
-    return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
+    return new RGWElasticRemoveRemoteObjCBCR(sc, sync_pipe, key, mtime, conf);
   }
-  RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
+  RGWCoroutine *create_delete_marker(RGWDataSyncCtx *sc, rgw_bucket_sync_pipe& sync_pipe, rgw_obj_key& key, real_time& mtime,
                                      rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch, rgw_zone_set *zones_trace) override {
-    ldout(sync_env->cct, 10) << conf->id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
+    ldout(sc->cct, 10) << conf->id << ": create_delete_marker: b=" << sync_pipe.info.source_bs.bucket << " k=" << key << " mtime=" << mtime
                             << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
-    ldout(sync_env->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
+    ldout(sc->cct, 10) << conf->id << ": skipping operation (not handled)" << dendl;
     return NULL;
   }
   RGWRESTConn *get_rest_conn() {