]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_sync_module_es.cc
add subtree-ish sources for 12.0.3
[ceph.git] / ceph / src / rgw / rgw_sync_module_es.cc
CommitLineData
7c673cae
FG
1#include "rgw_common.h"
2#include "rgw_coroutine.h"
3#include "rgw_sync_module.h"
4#include "rgw_data_sync.h"
5#include "rgw_boost_asio_yield.h"
6#include "rgw_sync_module_es.h"
7#include "rgw_rest_conn.h"
8#include "rgw_cr_rest.h"
9
10#define dout_subsys ceph_subsys_rgw
11
12struct ElasticConfig {
13 string id;
14 RGWRESTConn *conn{nullptr};
15};
16
17static string es_get_obj_path(const RGWRealm& realm, const RGWBucketInfo& bucket_info, const rgw_obj_key& key)
18{
19 string path = "/rgw-" + realm.get_name() + "/object/" + bucket_info.bucket.bucket_id + ":" + key.name + ":" + key.instance;
20 return path;
21}
22
23struct es_obj_metadata {
24 CephContext *cct;
25 RGWBucketInfo bucket_info;
26 rgw_obj_key key;
27 ceph::real_time mtime;
28 uint64_t size;
29 map<string, bufferlist> attrs;
30
31 es_obj_metadata(CephContext *_cct, const RGWBucketInfo& _bucket_info,
32 const rgw_obj_key& _key, ceph::real_time& _mtime, uint64_t _size,
33 map<string, bufferlist>& _attrs) : cct(_cct), bucket_info(_bucket_info), key(_key),
34 mtime(_mtime), size(_size), attrs(std::move(_attrs)) {}
35
36 void dump(Formatter *f) const {
37 map<string, string> out_attrs;
38 map<string, string> custom_meta;
39 RGWAccessControlPolicy policy;
40 set<string> permissions;
41
42 for (auto i : attrs) {
43 const string& attr_name = i.first;
44 string name;
45 bufferlist& val = i.second;
46
47 if (attr_name.compare(0, sizeof(RGW_ATTR_PREFIX) - 1, RGW_ATTR_PREFIX) != 0) {
48 continue;
49 }
50
51 if (attr_name.compare(0, sizeof(RGW_ATTR_META_PREFIX) - 1, RGW_ATTR_META_PREFIX) == 0) {
52 name = attr_name.substr(sizeof(RGW_ATTR_META_PREFIX) - 1);
53 custom_meta[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0));
54 continue;
55 }
56
57 name = attr_name.substr(sizeof(RGW_ATTR_PREFIX) - 1);
58
59 if (name == "acl") {
60 try {
61 auto i = val.begin();
62 ::decode(policy, i);
63 } catch (buffer::error& err) {
64 ldout(cct, 0) << "ERROR: failed to decode acl for " << bucket_info.bucket << "/" << key << dendl;
65 }
66
67 const RGWAccessControlList& acl = policy.get_acl();
68
69 permissions.insert(policy.get_owner().get_id().to_str());
70 for (auto acliter : acl.get_grant_map()) {
71 const ACLGrant& grant = acliter.second;
72 if (grant.get_type().get_type() == ACL_TYPE_CANON_USER &&
73 ((uint32_t)grant.get_permission().get_permissions() & RGW_PERM_READ) != 0) {
74 rgw_user user;
75 if (grant.get_id(user)) {
76 permissions.insert(user.to_str());
77 }
78 }
79 }
80 } else {
81 if (name != "pg_ver" &&
82 name != "source_zone" &&
83 name != "idtag") {
84 out_attrs[name] = string(val.c_str(), (val.length() > 0 ? val.length() - 1 : 0));
85 }
86 }
87 }
88 ::encode_json("bucket", bucket_info.bucket.name, f);
89 ::encode_json("name", key.name, f);
90 ::encode_json("instance", key.instance, f);
91 ::encode_json("owner", policy.get_owner(), f);
92 ::encode_json("permissions", permissions, f);
93 f->open_object_section("meta");
94 ::encode_json("size", size, f);
95
96 string mtime_str;
97 rgw_to_iso8601(mtime, &mtime_str);
98 ::encode_json("mtime", mtime_str, f);
99 for (auto i : out_attrs) {
100 ::encode_json(i.first.c_str(), i.second, f);
101 }
102 if (!custom_meta.empty()) {
103 f->open_object_section("custom");
104 for (auto i : custom_meta) {
105 ::encode_json(i.first.c_str(), i.second, f);
106 }
107 f->close_section();
108 }
109 f->close_section();
110 }
111
112};
113
114class RGWElasticHandleRemoteObjCBCR : public RGWStatRemoteObjCBCR {
115 const ElasticConfig& conf;
116public:
117 RGWElasticHandleRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
118 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
119 const ElasticConfig& _conf) : RGWStatRemoteObjCBCR(_sync_env, _bucket_info, _key), conf(_conf) {}
120 int operate() override {
121 reenter(this) {
122 ldout(sync_env->cct, 0) << ": stat of remote obj: z=" << sync_env->source_zone
123 << " b=" << bucket_info.bucket << " k=" << key << " size=" << size << " mtime=" << mtime
124 << " attrs=" << attrs << dendl;
125 yield {
126 string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
127 es_obj_metadata doc(sync_env->cct, bucket_info, key, mtime, size, attrs);
128
129 call(new RGWPutRESTResourceCR<es_obj_metadata, int>(sync_env->cct, conf.conn,
130 sync_env->http_manager,
131 path, nullptr /* params */,
132 doc, nullptr /* result */));
133
134 }
135 if (retcode < 0) {
136 return set_cr_error(retcode);
137 }
138 return set_cr_done();
139 }
140 return 0;
141 }
142
143};
144
145class RGWElasticHandleRemoteObjCR : public RGWCallStatRemoteObjCR {
146 const ElasticConfig& conf;
147public:
148 RGWElasticHandleRemoteObjCR(RGWDataSyncEnv *_sync_env,
149 RGWBucketInfo& _bucket_info, rgw_obj_key& _key,
150 const ElasticConfig& _conf) : RGWCallStatRemoteObjCR(_sync_env, _bucket_info, _key),
151 conf(_conf) {
152 }
153
154 ~RGWElasticHandleRemoteObjCR() override {}
155
156 RGWStatRemoteObjCBCR *allocate_callback() override {
157 return new RGWElasticHandleRemoteObjCBCR(sync_env, bucket_info, key, conf);
158 }
159};
160
161class RGWElasticRemoveRemoteObjCBCR : public RGWCoroutine {
162 RGWDataSyncEnv *sync_env;
163 RGWBucketInfo bucket_info;
164 rgw_obj_key key;
165 ceph::real_time mtime;
166 const ElasticConfig& conf;
167public:
168 RGWElasticRemoveRemoteObjCBCR(RGWDataSyncEnv *_sync_env,
169 RGWBucketInfo& _bucket_info, rgw_obj_key& _key, const ceph::real_time& _mtime,
170 const ElasticConfig& _conf) : RGWCoroutine(_sync_env->cct), sync_env(_sync_env),
171 bucket_info(_bucket_info), key(_key),
172 mtime(_mtime), conf(_conf) {}
173 int operate() override {
174 reenter(this) {
175 ldout(sync_env->cct, 0) << ": remove remote obj: z=" << sync_env->source_zone
176 << " b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << dendl;
177 yield {
178 string path = es_get_obj_path(sync_env->store->get_realm(), bucket_info, key);
179
180 call(new RGWDeleteRESTResourceCR(sync_env->cct, conf.conn,
181 sync_env->http_manager,
182 path, nullptr /* params */));
183 }
184 if (retcode < 0) {
185 return set_cr_error(retcode);
186 }
187 return set_cr_done();
188 }
189 return 0;
190 }
191
192};
193
194class RGWElasticDataSyncModule : public RGWDataSyncModule {
195 ElasticConfig conf;
196public:
197 RGWElasticDataSyncModule(CephContext *cct, const string& elastic_endpoint) {
198 conf.id = string("elastic:") + elastic_endpoint;
199 conf.conn = new RGWRESTConn(cct, nullptr, conf.id, { elastic_endpoint });
200 }
201 ~RGWElasticDataSyncModule() override {
202 delete conf.conn;
203 }
204
205 RGWCoroutine *sync_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, uint64_t versioned_epoch) override {
206 ldout(sync_env->cct, 0) << conf.id << ": sync_object: b=" << bucket_info.bucket << " k=" << key << " versioned_epoch=" << versioned_epoch << dendl;
207 return new RGWElasticHandleRemoteObjCR(sync_env, bucket_info, key, conf);
208 }
209 RGWCoroutine *remove_object(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime, bool versioned, uint64_t versioned_epoch) override {
210 /* versioned and versioned epoch params are useless in the elasticsearch backend case */
211 ldout(sync_env->cct, 0) << conf.id << ": rm_object: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
212 return new RGWElasticRemoveRemoteObjCBCR(sync_env, bucket_info, key, mtime, conf);
213 }
214 RGWCoroutine *create_delete_marker(RGWDataSyncEnv *sync_env, RGWBucketInfo& bucket_info, rgw_obj_key& key, real_time& mtime,
215 rgw_bucket_entry_owner& owner, bool versioned, uint64_t versioned_epoch) override {
216 ldout(sync_env->cct, 0) << conf.id << ": create_delete_marker: b=" << bucket_info.bucket << " k=" << key << " mtime=" << mtime
217 << " versioned=" << versioned << " versioned_epoch=" << versioned_epoch << dendl;
218 return NULL;
219 }
220};
221
222class RGWElasticSyncModuleInstance : public RGWSyncModuleInstance {
223 RGWElasticDataSyncModule data_handler;
224public:
225 RGWElasticSyncModuleInstance(CephContext *cct, const string& endpoint) : data_handler(cct, endpoint) {}
226 RGWDataSyncModule *get_data_handler() override {
227 return &data_handler;
228 }
229};
230
231int RGWElasticSyncModule::create_instance(CephContext *cct, map<string, string>& config, RGWSyncModuleInstanceRef *instance) {
232 string endpoint;
233 auto i = config.find("endpoint");
234 if (i != config.end()) {
235 endpoint = i->second;
236 }
237 instance->reset(new RGWElasticSyncModuleInstance(cct, endpoint));
238 return 0;
239}
240