]> git.proxmox.com Git - ceph.git/blob - ceph/src/rgw/rgw_cr_rest.h
import quincy beta 17.1.0
[ceph.git] / ceph / src / rgw / rgw_cr_rest.h
1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
3
4 #pragma once
5
6 #include <boost/intrusive_ptr.hpp>
7 #include <mutex>
8 #include "include/ceph_assert.h" // boost header clobbers our assert.h
9
10 #include "rgw_coroutine.h"
11 #include "rgw_rest_conn.h"
12
13
14 struct rgw_rest_obj {
15 rgw_obj_key key;
16 uint64_t content_len;
17 std::map<std::string, std::string> attrs;
18 std::map<std::string, std::string> custom_attrs;
19 RGWAccessControlPolicy acls;
20
21 void init(const rgw_obj_key& _key) {
22 key = _key;
23 }
24 };
25
26 class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
27 bufferlist *result;
28 protected:
29 RGWRESTConn *conn;
30 RGWHTTPManager *http_manager;
31 std::string path;
32 param_vec_t params;
33 param_vec_t extra_headers;
34 public:
35 boost::intrusive_ptr<RGWRESTReadResource> http_op;
36 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
37 RGWHTTPManager *_http_manager, const std::string& _path,
38 rgw_http_param_pair *params, bufferlist *_result)
39 : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager),
40 path(_path), params(make_param_list(params))
41 {}
42
43 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
44 RGWHTTPManager *_http_manager, const std::string& _path,
45 rgw_http_param_pair *params)
46 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
47 path(_path), params(make_param_list(params))
48 {}
49
50 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
51 RGWHTTPManager *_http_manager, const std::string& _path,
52 rgw_http_param_pair *params, param_vec_t &hdrs)
53 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
54 path(_path), params(make_param_list(params)),
55 extra_headers(hdrs)
56 {}
57
58 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
59 RGWHTTPManager *_http_manager, const std::string& _path,
60 rgw_http_param_pair *params,
61 std::map <std::string, std::string> *hdrs)
62 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
63 path(_path), params(make_param_list(params)),
64 extra_headers(make_param_list(hdrs))
65 {}
66
67
68 ~RGWReadRawRESTResourceCR() override {
69 request_cleanup();
70 }
71
72 int send_request(const DoutPrefixProvider *dpp) override {
73 auto op = boost::intrusive_ptr<RGWRESTReadResource>(
74 new RGWRESTReadResource(conn, path, params, &extra_headers, http_manager));
75
76 init_new_io(op.get());
77
78 int ret = op->aio_read(dpp);
79 if (ret < 0) {
80 log_error() << "failed to send http operation: " << op->to_str()
81 << " ret=" << ret << std::endl;
82 op->put();
83 return ret;
84 }
85 std::swap(http_op, op); // store reference in http_op on success
86 return 0;
87 }
88
89
90
91 virtual int wait_result() {
92 return http_op->wait(result, null_yield);
93 }
94
95 int request_complete() override {
96 int ret;
97
98 ret = wait_result();
99
100 auto op = std::move(http_op); // release ref on return
101 if (ret < 0) {
102 error_stream << "http operation failed: " << op->to_str()
103 << " status=" << op->get_http_status() << std::endl;
104 op->put();
105 return ret;
106 }
107 op->put();
108 return 0;
109 }
110
111 void request_cleanup() override {
112 if (http_op) {
113 http_op->put();
114 http_op = NULL;
115 }
116 }
117
118 };
119
120
121 template <class T>
122 class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR {
123 T *result;
124 public:
125 RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
126 RGWHTTPManager *_http_manager, const std::string& _path,
127 rgw_http_param_pair *params, T *_result)
128 : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result)
129 {}
130
131 RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
132 RGWHTTPManager *_http_manager, const std::string& _path,
133 rgw_http_param_pair *params,
134 std::map <std::string, std::string> *hdrs,
135 T *_result)
136 : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result)
137 {}
138
139 int wait_result() override {
140 return http_op->wait(result, null_yield);
141 }
142
143 };
144
145 template <class T, class E = int>
146 class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
147 protected:
148 RGWRESTConn *conn;
149 RGWHTTPManager *http_manager;
150 std::string method;
151 std::string path;
152 param_vec_t params;
153 param_vec_t headers;
154 std::map<std::string, std::string> *attrs;
155 T *result;
156 E *err_result;
157 bufferlist input_bl;
158 bool send_content_length=false;
159 boost::intrusive_ptr<RGWRESTSendResource> http_op;
160
161 public:
162 RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
163 RGWHTTPManager *_http_manager,
164 const std::string& _method, const std::string& _path,
165 rgw_http_param_pair *_params,
166 std::map<std::string, std::string> *_attrs,
167 bufferlist& _input, T *_result,
168 bool _send_content_length,
169 E *_err_result = nullptr)
170 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
171 method(_method), path(_path), params(make_param_list(_params)),
172 headers(make_param_list(_attrs)), attrs(_attrs),
173 result(_result), err_result(_err_result),
174 input_bl(_input), send_content_length(_send_content_length) {}
175
176 RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
177 RGWHTTPManager *_http_manager,
178 const std::string& _method, const std::string& _path,
179 rgw_http_param_pair *_params, std::map<std::string, std::string> *_attrs,
180 T *_result, E *_err_result = nullptr)
181 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
182 method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result),
183 err_result(_err_result) {}
184
185 ~RGWSendRawRESTResourceCR() override {
186 request_cleanup();
187 }
188
189 int send_request(const DoutPrefixProvider *dpp) override {
190 auto op = boost::intrusive_ptr<RGWRESTSendResource>(
191 new RGWRESTSendResource(conn, method, path, params, &headers, http_manager));
192
193 init_new_io(op.get());
194
195 int ret = op->aio_send(dpp, input_bl);
196 if (ret < 0) {
197 ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send request" << dendl;
198 op->put();
199 return ret;
200 }
201 std::swap(http_op, op); // store reference in http_op on success
202 return 0;
203 }
204
205 int request_complete() override {
206 int ret;
207 if (result || err_result) {
208 ret = http_op->wait(result, null_yield, err_result);
209 } else {
210 bufferlist bl;
211 ret = http_op->wait(&bl, null_yield);
212 }
213 auto op = std::move(http_op); // release ref on return
214 if (ret < 0) {
215 error_stream << "http operation failed: " << op->to_str()
216 << " status=" << op->get_http_status() << std::endl;
217 lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret
218 << ": " << op->to_str() << dendl;
219 op->put();
220 return ret;
221 }
222 op->put();
223 return 0;
224 }
225
226 void request_cleanup() override {
227 if (http_op) {
228 http_op->put();
229 http_op = NULL;
230 }
231 }
232 };
233
234 template <class S, class T, class E = int>
235 class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T, E> {
236 public:
237 RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
238 RGWHTTPManager *_http_manager,
239 const std::string& _method, const std::string& _path,
240 rgw_http_param_pair *_params, std::map<std::string, std::string> *_attrs,
241 S& _input, T *_result, E *_err_result = nullptr)
242 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, _method, _path, _params, _attrs, _result, _err_result) {
243
244 JSONFormatter jf;
245 encode_json("data", _input, &jf);
246 std::stringstream ss;
247 jf.flush(ss);
248 //bufferlist bl;
249 this->input_bl.append(ss.str());
250 }
251
252 };
253
254 template <class S, class T, class E = int>
255 class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> {
256 public:
257 RGWPostRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
258 RGWHTTPManager *_http_manager,
259 const std::string& _path,
260 rgw_http_param_pair *_params, S& _input,
261 T *_result, E *_err_result = nullptr)
262 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
263 "POST", _path,
264 _params, nullptr, _input,
265 _result, _err_result) {}
266 };
267
268 template <class T, class E = int>
269 class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> {
270 public:
271 RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
272 RGWHTTPManager *_http_manager,
273 const std::string& _path,
274 rgw_http_param_pair *_params, bufferlist& _input,
275 T *_result, E *_err_result = nullptr)
276 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "PUT", _path,
277 _params, nullptr, _input, _result, true, _err_result) {}
278
279 };
280
281 template <class T, class E = int>
282 class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> {
283 public:
284 RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
285 RGWHTTPManager *_http_manager,
286 const std::string& _path,
287 rgw_http_param_pair *_params,
288 std::map<std::string, std::string> * _attrs,
289 bufferlist& _input,
290 T *_result, E *_err_result = nullptr)
291 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "POST", _path,
292 _params, _attrs, _input, _result, true, _err_result) {}
293
294 };
295
296
297 template <class S, class T, class E = int>
298 class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> {
299 public:
300 RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
301 RGWHTTPManager *_http_manager,
302 const std::string& _path,
303 rgw_http_param_pair *_params, S& _input,
304 T *_result, E *_err_result = nullptr)
305 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
306 "PUT", _path,
307 _params, nullptr, _input,
308 _result, _err_result) {}
309
310 RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
311 RGWHTTPManager *_http_manager,
312 const std::string& _path,
313 rgw_http_param_pair *_params,
314 std::map<std::string, std::string> *_attrs,
315 S& _input, T *_result, E *_err_result = nullptr)
316 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
317 "PUT", _path,
318 _params, _attrs, _input,
319 _result, _err_result) {}
320
321 };
322
323 class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
324 RGWRESTConn *conn;
325 RGWHTTPManager *http_manager;
326 std::string path;
327 param_vec_t params;
328
329 boost::intrusive_ptr<RGWRESTDeleteResource> http_op;
330
331 public:
332 RGWDeleteRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
333 RGWHTTPManager *_http_manager,
334 const std::string& _path,
335 rgw_http_param_pair *_params)
336 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
337 path(_path), params(make_param_list(_params))
338 {}
339
340 ~RGWDeleteRESTResourceCR() override {
341 request_cleanup();
342 }
343
344 int send_request(const DoutPrefixProvider *dpp) override {
345 auto op = boost::intrusive_ptr<RGWRESTDeleteResource>(
346 new RGWRESTDeleteResource(conn, path, params, nullptr, http_manager));
347
348 init_new_io(op.get());
349
350 bufferlist bl;
351
352 int ret = op->aio_send(dpp, bl);
353 if (ret < 0) {
354 ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send DELETE request" << dendl;
355 op->put();
356 return ret;
357 }
358 std::swap(http_op, op); // store reference in http_op on success
359 return 0;
360 }
361
362 int request_complete() override {
363 int ret;
364 bufferlist bl;
365 ret = http_op->wait(&bl, null_yield);
366 auto op = std::move(http_op); // release ref on return
367 if (ret < 0) {
368 error_stream << "http operation failed: " << op->to_str()
369 << " status=" << op->get_http_status() << std::endl;
370 lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret
371 << ": " << op->to_str() << dendl;
372 op->put();
373 return ret;
374 }
375 op->put();
376 return 0;
377 }
378
379 void request_cleanup() override {
380 if (http_op) {
381 http_op->put();
382 http_op = NULL;
383 }
384 }
385 };
386
387 class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
388 ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB");
389 RGWCoroutinesEnv *env;
390 RGWCoroutine *cr;
391 RGWHTTPStreamRWRequest *req;
392 rgw_io_id io_id;
393 bufferlist data;
394 bufferlist extra_data;
395 bool got_all_extra_data{false};
396 bool paused{false};
397 bool notified{false};
398 public:
399 RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req);
400
401 int handle_data(bufferlist& bl, bool *pause) override;
402
403 void claim_data(bufferlist *dest, uint64_t max);
404
405 bufferlist& get_extra_data() {
406 return extra_data;
407 }
408
409 bool has_data() {
410 return (data.length() > 0);
411 }
412
413 bool has_all_extra_data() {
414 return got_all_extra_data;
415 }
416 };
417
418
419 class RGWStreamReadResourceCRF {
420 protected:
421 boost::asio::coroutine read_state;
422
423 public:
424 virtual int init(const DoutPrefixProvider *dpp) = 0;
425 virtual int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
426 virtual int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) = 0;
427 virtual bool has_attrs() = 0;
428 virtual void get_attrs(std::map<std::string, std::string> *attrs) = 0;
429 virtual ~RGWStreamReadResourceCRF() = default;
430 };
431
432 class RGWStreamWriteResourceCRF {
433 protected:
434 boost::asio::coroutine write_state;
435 boost::asio::coroutine drain_state;
436
437 public:
438 virtual int init() = 0;
439 virtual void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) = 0;
440 virtual int send() = 0;
441 virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */
442 virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
443
444 virtual ~RGWStreamWriteResourceCRF() = default;
445 };
446
447 class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
448 CephContext *cct;
449 RGWCoroutinesEnv *env;
450 RGWCoroutine *caller;
451 RGWHTTPManager *http_manager;
452
453 RGWHTTPStreamRWRequest *req{nullptr};
454
455 std::optional<RGWCRHTTPGetDataCB> in_cb;
456
457 bufferlist extra_data;
458
459 bool got_attrs{false};
460 bool got_extra_data{false};
461
462 rgw_io_id io_read_mask;
463
464 protected:
465 rgw_rest_obj rest_obj;
466
467 struct range_info {
468 bool is_set{false};
469 uint64_t ofs;
470 uint64_t size;
471 } range;
472
473 ceph::real_time mtime;
474 std::string etag;
475
476 public:
477 RGWStreamReadHTTPResourceCRF(CephContext *_cct,
478 RGWCoroutinesEnv *_env,
479 RGWCoroutine *_caller,
480 RGWHTTPManager *_http_manager,
481 const rgw_obj_key& _src_key) : cct(_cct),
482 env(_env),
483 caller(_caller),
484 http_manager(_http_manager) {
485 rest_obj.init(_src_key);
486 }
487 ~RGWStreamReadHTTPResourceCRF();
488
489 int init(const DoutPrefixProvider *dpp) override;
490 int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
491 int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) override;
492 bool has_attrs() override;
493 void get_attrs(std::map<std::string, std::string> *attrs) override;
494 bool is_done();
495 virtual bool need_extra_data() { return false; }
496
497 void set_req(RGWHTTPStreamRWRequest *r) {
498 req = r;
499 }
500
501 rgw_rest_obj& get_rest_obj() {
502 return rest_obj;
503 }
504
505 void set_range(uint64_t ofs, uint64_t size) {
506 range.is_set = true;
507 range.ofs = ofs;
508 range.size = size;
509 }
510 };
511
512 class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
513 protected:
514 RGWCoroutinesEnv *env;
515 RGWCoroutine *caller;
516 RGWHTTPManager *http_manager;
517
518 using lock_guard = std::lock_guard<std::mutex>;
519
520 std::mutex blocked_lock;
521 bool is_blocked;
522
523 RGWHTTPStreamRWRequest *req{nullptr};
524
525 struct multipart_info {
526 bool is_multipart{false};
527 std::string upload_id;
528 int part_num{0};
529 uint64_t part_size;
530 } multipart;
531
532 class WriteDrainNotify : public RGWWriteDrainCB {
533 RGWStreamWriteHTTPResourceCRF *crf;
534 public:
535 explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {}
536 void notify(uint64_t pending_size) override;
537 } write_drain_notify_cb;
538
539 public:
540 RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
541 RGWCoroutinesEnv *_env,
542 RGWCoroutine *_caller,
543 RGWHTTPManager *_http_manager) : env(_env),
544 caller(_caller),
545 http_manager(_http_manager),
546 write_drain_notify_cb(this) {}
547 virtual ~RGWStreamWriteHTTPResourceCRF();
548
549 int init() override {
550 return 0;
551 }
552 void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override;
553 int send() override;
554 int write(bufferlist& data, bool *need_retry) override; /* reentrant */
555 void write_drain_notify(uint64_t pending_size);
556 int drain_writes(bool *need_retry) override; /* reentrant */
557
558 virtual void handle_headers(const std::map<std::string, std::string>& headers) {}
559
560 void set_req(RGWHTTPStreamRWRequest *r) {
561 req = r;
562 }
563
564 void set_multipart(const std::string& upload_id, int part_num, uint64_t part_size) {
565 multipart.is_multipart = true;
566 multipart.upload_id = upload_id;
567 multipart.part_num = part_num;
568 multipart.part_size = part_size;
569 }
570 };
571
572 class RGWStreamSpliceCR : public RGWCoroutine {
573 CephContext *cct;
574 RGWHTTPManager *http_manager;
575 std::string url;
576 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
577 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
578 bufferlist bl;
579 bool need_retry{false};
580 bool sent_attrs{false};
581 uint64_t total_read{0};
582 int ret{0};
583 public:
584 RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
585 std::shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf,
586 std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf);
587 ~RGWStreamSpliceCR();
588
589 int operate(const DoutPrefixProvider *dpp) override;
590 };