]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_sync_module_es_rest.cc
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / rgw / rgw_sync_module_es_rest.cc
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab
3
4 #include "rgw_sync_module_es.h"
5 #include "rgw_sync_module_es_rest.h"
6 #include "rgw_es_query.h"
7 #include "rgw_op.h"
8 #include "rgw_rest.h"
9 #include "rgw_rest_s3.h"
10
11 #define dout_context g_ceph_context
12 #define dout_subsys ceph_subsys_rgw
13
14 struct es_index_obj_response {
15 string bucket;
16 rgw_obj_key key;
17 uint64_t versioned_epoch{0};
18 ACLOwner owner;
19 set<string> read_permissions;
20
21 struct {
22 uint64_t size{0};
23 ceph::real_time mtime;
24 string etag;
25 string content_type;
26 string storage_class;
27 map<string, string> custom_str;
28 map<string, int64_t> custom_int;
29 map<string, string> custom_date;
30
31 template <class T>
32 struct _custom_entry {
33 string name;
34 T value;
35 void decode_json(JSONObj *obj) {
36 JSONDecoder::decode_json("name", name, obj);
37 JSONDecoder::decode_json("value", value, obj);
38 }
39 };
40
41 void decode_json(JSONObj *obj) {
42 JSONDecoder::decode_json("size", size, obj);
43 string mtime_str;
44 JSONDecoder::decode_json("mtime", mtime_str, obj);
45 parse_time(mtime_str.c_str(), &mtime);
46 JSONDecoder::decode_json("etag", etag, obj);
47 JSONDecoder::decode_json("content_type", content_type, obj);
48 JSONDecoder::decode_json("storage_class", storage_class, obj);
49 list<_custom_entry<string> > str_entries;
50 JSONDecoder::decode_json("custom-string", str_entries, obj);
51 for (auto& e : str_entries) {
52 custom_str[e.name] = e.value;
53 }
54 list<_custom_entry<int64_t> > int_entries;
55 JSONDecoder::decode_json("custom-int", int_entries, obj);
56 for (auto& e : int_entries) {
57 custom_int[e.name] = e.value;
58 }
59 list<_custom_entry<string> > date_entries;
60 JSONDecoder::decode_json("custom-date", date_entries, obj);
61 for (auto& e : date_entries) {
62 custom_date[e.name] = e.value;
63 }
64 }
65 } meta;
66
67 void decode_json(JSONObj *obj) {
68 JSONDecoder::decode_json("bucket", bucket, obj);
69 JSONDecoder::decode_json("name", key.name, obj);
70 JSONDecoder::decode_json("instance", key.instance, obj);
71 JSONDecoder::decode_json("versioned_epoch", versioned_epoch, obj);
72 JSONDecoder::decode_json("permissions", read_permissions, obj);
73 JSONDecoder::decode_json("owner", owner, obj);
74 JSONDecoder::decode_json("meta", meta, obj);
75 }
76 };
77
78 struct es_search_response {
79 uint32_t took;
80 bool timed_out;
81 struct {
82 uint32_t total;
83 uint32_t successful;
84 uint32_t failed;
85 void decode_json(JSONObj *obj) {
86 JSONDecoder::decode_json("total", total, obj);
87 JSONDecoder::decode_json("successful", successful, obj);
88 JSONDecoder::decode_json("failed", failed, obj);
89 }
90 } shards;
91 struct obj_hit {
92 string index;
93 string type;
94 string id;
95 // double score
96 es_index_obj_response source;
97 void decode_json(JSONObj *obj) {
98 JSONDecoder::decode_json("_index", index, obj);
99 JSONDecoder::decode_json("_type", type, obj);
100 JSONDecoder::decode_json("_id", id, obj);
101 JSONDecoder::decode_json("_source", source, obj);
102 }
103 };
104 struct {
105 uint32_t total;
106 // double max_score;
107 list<obj_hit> hits;
108 void decode_json(JSONObj *obj) {
109 JSONDecoder::decode_json("total", total, obj);
110 // JSONDecoder::decode_json("max_score", max_score, obj);
111 JSONDecoder::decode_json("hits", hits, obj);
112 }
113 } hits;
114 void decode_json(JSONObj *obj) {
115 JSONDecoder::decode_json("took", took, obj);
116 JSONDecoder::decode_json("timed_out", timed_out, obj);
117 JSONDecoder::decode_json("_shards", shards, obj);
118 JSONDecoder::decode_json("hits", hits, obj);
119 }
120 };
121
122 class RGWMetadataSearchOp : public RGWOp {
123 RGWSyncModuleInstanceRef sync_module_ref;
124 RGWElasticSyncModuleInstance *es_module;
125 protected:
126 string expression;
127 string custom_prefix;
128 #define MAX_KEYS_DEFAULT 100
129 uint64_t max_keys{MAX_KEYS_DEFAULT};
130 string marker_str;
131 uint64_t marker{0};
132 string next_marker;
133 bool is_truncated{false};
134 string err;
135
136 es_search_response response;
137
138 public:
139 RGWMetadataSearchOp(const RGWSyncModuleInstanceRef& sync_module) : sync_module_ref(sync_module) {
140 es_module = static_cast<RGWElasticSyncModuleInstance *>(sync_module_ref.get());
141 }
142
143 int verify_permission() override {
144 return 0;
145 }
146 virtual int get_params() = 0;
147 void pre_exec() override;
148 void execute() override;
149
150 const char* name() const override { return "metadata_search"; }
151 virtual RGWOpType get_type() override { return RGW_OP_METADATA_SEARCH; }
152 virtual uint32_t op_mask() override { return RGW_OP_TYPE_READ; }
153 };
154
155 void RGWMetadataSearchOp::pre_exec()
156 {
157 rgw_bucket_object_pre_exec(s);
158 }
159
160 void RGWMetadataSearchOp::execute()
161 {
162 op_ret = get_params();
163 if (op_ret < 0)
164 return;
165
166 list<pair<string, string> > conds;
167
168 if (!s->user->system) {
169 conds.push_back(make_pair("permissions", s->user->user_id.to_str()));
170 }
171
172 if (!s->bucket_name.empty()) {
173 conds.push_back(make_pair("bucket", s->bucket_name));
174 }
175
176 ESQueryCompiler es_query(expression, &conds, custom_prefix);
177
178 static map<string, string, ltstr_nocase> aliases = {
179 { "bucket", "bucket" }, /* forces lowercase */
180 { "name", "name" },
181 { "key", "name" },
182 { "instance", "instance" },
183 { "etag", "meta.etag" },
184 { "size", "meta.size" },
185 { "mtime", "meta.mtime" },
186 { "lastmodified", "meta.mtime" },
187 { "last_modified", "meta.mtime" },
188 { "contenttype", "meta.content_type" },
189 { "content_type", "meta.content_type" },
190 { "storageclass", "meta.storage_class" },
191 { "storage_class", "meta.storage_class" },
192 };
193 es_query.set_field_aliases(&aliases);
194
195 static map<string, ESEntityTypeMap::EntityType> generic_map = { {"bucket", ESEntityTypeMap::ES_ENTITY_STR},
196 {"name", ESEntityTypeMap::ES_ENTITY_STR},
197 {"instance", ESEntityTypeMap::ES_ENTITY_STR},
198 {"permissions", ESEntityTypeMap::ES_ENTITY_STR},
199 {"meta.etag", ESEntityTypeMap::ES_ENTITY_STR},
200 {"meta.content_type", ESEntityTypeMap::ES_ENTITY_STR},
201 {"meta.mtime", ESEntityTypeMap::ES_ENTITY_DATE},
202 {"meta.size", ESEntityTypeMap::ES_ENTITY_INT},
203 {"meta.storage_class", ESEntityTypeMap::ES_ENTITY_STR} };
204 ESEntityTypeMap gm(generic_map);
205 es_query.set_generic_type_map(&gm);
206
207 static set<string> restricted_fields = { {"permissions"} };
208 es_query.set_restricted_fields(&restricted_fields);
209
210 map<string, ESEntityTypeMap::EntityType> custom_map;
211 for (auto& i : s->bucket_info.mdsearch_config) {
212 custom_map[i.first] = (ESEntityTypeMap::EntityType)i.second;
213 }
214
215 ESEntityTypeMap em(custom_map);
216 es_query.set_custom_type_map(&em);
217
218 bool valid = es_query.compile(&err);
219 if (!valid) {
220 ldout(s->cct, 10) << "invalid query, failed generating request json" << dendl;
221 op_ret = -EINVAL;
222 return;
223 }
224
225 JSONFormatter f;
226 encode_json("root", es_query, &f);
227
228 RGWRESTConn *conn = es_module->get_rest_conn();
229
230 bufferlist in;
231 bufferlist out;
232
233 stringstream ss;
234
235 f.flush(ss);
236 in.append(ss.str());
237
238 string resource = es_module->get_index_path() + "/_search";
239 param_vec_t params;
240 static constexpr int BUFSIZE = 32;
241 char buf[BUFSIZE];
242 snprintf(buf, sizeof(buf), "%lld", (long long)max_keys);
243 params.push_back(param_pair_t("size", buf));
244 if (marker > 0) {
245 params.push_back(param_pair_t("from", marker_str.c_str()));
246 }
247 ldout(s->cct, 20) << "sending request to elasticsearch, payload=" << string(in.c_str(), in.length()) << dendl;
248 auto& extra_headers = es_module->get_request_headers();
249 op_ret = conn->get_resource(resource, &params, &extra_headers, out, &in);
250 if (op_ret < 0) {
251 ldout(s->cct, 0) << "ERROR: failed to fetch resource (r=" << resource << ", ret=" << op_ret << ")" << dendl;
252 return;
253 }
254
255 ldout(s->cct, 20) << "response: " << string(out.c_str(), out.length()) << dendl;
256
257 JSONParser jparser;
258 if (!jparser.parse(out.c_str(), out.length())) {
259 ldout(s->cct, 0) << "ERROR: failed to parse elasticsearch response" << dendl;
260 op_ret = -EINVAL;
261 return;
262 }
263
264 try {
265 decode_json_obj(response, &jparser);
266 } catch (JSONDecoder::err& e) {
267 ldout(s->cct, 0) << "ERROR: failed to decode JSON input: " << e.message << dendl;
268 op_ret = -EINVAL;
269 return;
270 }
271
272 }
273
274 class RGWMetadataSearch_ObjStore_S3 : public RGWMetadataSearchOp {
275 public:
276 explicit RGWMetadataSearch_ObjStore_S3(const RGWSyncModuleInstanceRef& _sync_module) : RGWMetadataSearchOp(_sync_module) {
277 custom_prefix = "x-amz-meta-";
278 }
279
280 int get_params() override {
281 expression = s->info.args.get("query");
282 bool exists;
283 string max_keys_str = s->info.args.get("max-keys", &exists);
284 #define MAX_KEYS_MAX 10000
285 if (exists) {
286 string err;
287 max_keys = strict_strtoll(max_keys_str.c_str(), 10, &err);
288 if (!err.empty()) {
289 return -EINVAL;
290 }
291 if (max_keys > MAX_KEYS_MAX) {
292 max_keys = MAX_KEYS_MAX;
293 }
294 }
295 marker_str = s->info.args.get("marker", &exists);
296 if (exists) {
297 string err;
298 marker = strict_strtoll(marker_str.c_str(), 10, &err);
299 if (!err.empty()) {
300 return -EINVAL;
301 }
302 }
303 uint64_t nm = marker + max_keys;
304 static constexpr int BUFSIZE = 32;
305 char buf[BUFSIZE];
306 snprintf(buf, sizeof(buf), "%lld", (long long)nm);
307 next_marker = buf;
308 return 0;
309 }
310 void send_response() override {
311 if (op_ret) {
312 s->err.message = err;
313 set_req_state_err(s, op_ret);
314 }
315 dump_errno(s);
316 end_header(s, this, "application/xml");
317
318 if (op_ret < 0) {
319 return;
320 }
321
322 is_truncated = (response.hits.hits.size() >= max_keys);
323
324 s->formatter->open_object_section("SearchMetadataResponse");
325 s->formatter->dump_string("Marker", marker_str);
326 s->formatter->dump_string("IsTruncated", (is_truncated ? "true" : "false"));
327 if (is_truncated) {
328 s->formatter->dump_string("NextMarker", next_marker);
329 }
330 if (s->format == RGW_FORMAT_JSON) {
331 s->formatter->open_array_section("Objects");
332 }
333 for (auto& i : response.hits.hits) {
334 s->formatter->open_object_section("Contents");
335 es_index_obj_response& e = i.source;
336 s->formatter->dump_string("Bucket", e.bucket);
337 s->formatter->dump_string("Key", e.key.name);
338 string instance = (!e.key.instance.empty() ? e.key.instance : "null");
339 s->formatter->dump_string("Instance", instance.c_str());
340 s->formatter->dump_int("VersionedEpoch", e.versioned_epoch);
341 dump_time(s, "LastModified", &e.meta.mtime);
342 s->formatter->dump_int("Size", e.meta.size);
343 s->formatter->dump_format("ETag", "\"%s\"", e.meta.etag.c_str());
344 s->formatter->dump_string("ContentType", e.meta.content_type.c_str());
345 s->formatter->dump_string("StorageClass", e.meta.storage_class.c_str());
346 dump_owner(s, e.owner.get_id(), e.owner.get_display_name());
347 s->formatter->open_array_section("CustomMetadata");
348 for (auto& m : e.meta.custom_str) {
349 s->formatter->open_object_section("Entry");
350 s->formatter->dump_string("Name", m.first.c_str());
351 s->formatter->dump_string("Value", m.second);
352 s->formatter->close_section();
353 }
354 for (auto& m : e.meta.custom_int) {
355 s->formatter->open_object_section("Entry");
356 s->formatter->dump_string("Name", m.first.c_str());
357 s->formatter->dump_int("Value", m.second);
358 s->formatter->close_section();
359 }
360 for (auto& m : e.meta.custom_date) {
361 s->formatter->open_object_section("Entry");
362 s->formatter->dump_string("Name", m.first.c_str());
363 s->formatter->dump_string("Value", m.second);
364 s->formatter->close_section();
365 }
366 s->formatter->close_section();
367 rgw_flush_formatter(s, s->formatter);
368 s->formatter->close_section();
369 };
370 if (s->format == RGW_FORMAT_JSON) {
371 s->formatter->close_section();
372 }
373 s->formatter->close_section();
374 rgw_flush_formatter_and_reset(s, s->formatter);
375 }
376 };
377
378 class RGWHandler_REST_MDSearch_S3 : public RGWHandler_REST_S3 {
379 protected:
380 RGWOp *op_get() override {
381 if (s->info.args.exists("query")) {
382 return new RGWMetadataSearch_ObjStore_S3(store->get_sync_module());
383 }
384 if (!s->init_state.url_bucket.empty() &&
385 s->info.args.exists("mdsearch")) {
386 return new RGWGetBucketMetaSearch_ObjStore_S3;
387 }
388 return nullptr;
389 }
390 RGWOp *op_head() override {
391 return nullptr;
392 }
393 RGWOp *op_post() override {
394 return nullptr;
395 }
396 public:
397 explicit RGWHandler_REST_MDSearch_S3(const rgw::auth::StrategyRegistry& auth_registry) : RGWHandler_REST_S3(auth_registry) {}
398 virtual ~RGWHandler_REST_MDSearch_S3() {}
399 };
400
401
402 RGWHandler_REST* RGWRESTMgr_MDSearch_S3::get_handler(struct req_state* const s,
403 const rgw::auth::StrategyRegistry& auth_registry,
404 const std::string& frontend_prefix)
405 {
406 int ret =
407 RGWHandler_REST_S3::init_from_header(s,
408 RGW_FORMAT_XML, true);
409 if (ret < 0) {
410 return nullptr;
411 }
412
413 if (!s->object.empty()) {
414 return nullptr;
415 }
416
417 RGWHandler_REST *handler = new RGWHandler_REST_MDSearch_S3(auth_registry);
418
419 ldout(s->cct, 20) << __func__ << " handler=" << typeid(*handler).name()
420 << dendl;
421 return handler;
422 }
423