]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rest.h
import ceph 15.2.10
[ceph.git] / ceph / src / rgw / rgw_cr_rest.h
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2 3
9f95a23c 4#pragma once
7c673cae
FG
5
6#include <boost/intrusive_ptr.hpp>
11fdf7f2
TL
7#include <mutex>
8#include "include/ceph_assert.h" // boost header clobbers our assert.h
7c673cae
FG
9
10#include "rgw_coroutine.h"
11#include "rgw_rest_conn.h"
12
11fdf7f2
TL
13
14struct rgw_rest_obj {
15 rgw_obj_key key;
16 uint64_t content_len;
17 std::map<string, string> attrs;
18 std::map<string, string> custom_attrs;
19 RGWAccessControlPolicy acls;
20
21 void init(const rgw_obj_key& _key) {
22 key = _key;
23 }
24};
25
26class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine {
27 bufferlist *result;
28 protected:
7c673cae
FG
29 RGWRESTConn *conn;
30 RGWHTTPManager *http_manager;
31 string path;
32 param_vec_t params;
a8e16298
TL
33 param_vec_t extra_headers;
34public:
7c673cae 35 boost::intrusive_ptr<RGWRESTReadResource> http_op;
11fdf7f2
TL
36 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
37 RGWHTTPManager *_http_manager, const string& _path,
38 rgw_http_param_pair *params, bufferlist *_result)
39 : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager),
40 path(_path), params(make_param_list(params))
41 {}
7c673cae 42
11fdf7f2
TL
43 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
44 RGWHTTPManager *_http_manager, const string& _path,
45 rgw_http_param_pair *params)
46 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
47 path(_path), params(make_param_list(params))
48 {}
49
50 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
51 RGWHTTPManager *_http_manager, const string& _path,
52 rgw_http_param_pair *params, param_vec_t &hdrs)
7c673cae 53 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
11fdf7f2
TL
54 path(_path), params(make_param_list(params)),
55 extra_headers(hdrs)
7c673cae
FG
56 {}
57
11fdf7f2 58 RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
a8e16298
TL
59 RGWHTTPManager *_http_manager, const string& _path,
60 rgw_http_param_pair *params,
11fdf7f2 61 std::map <std::string, std::string> *hdrs)
a8e16298
TL
62 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
63 path(_path), params(make_param_list(params)),
11fdf7f2 64 extra_headers(make_param_list(hdrs))
a8e16298
TL
65 {}
66
67
11fdf7f2 68 ~RGWReadRawRESTResourceCR() override {
7c673cae
FG
69 request_cleanup();
70 }
71
72 int send_request() override {
73 auto op = boost::intrusive_ptr<RGWRESTReadResource>(
a8e16298 74 new RGWRESTReadResource(conn, path, params, &extra_headers, http_manager));
7c673cae 75
11fdf7f2 76 init_new_io(op.get());
7c673cae
FG
77
78 int ret = op->aio_read();
79 if (ret < 0) {
80 log_error() << "failed to send http operation: " << op->to_str()
81 << " ret=" << ret << std::endl;
82 op->put();
83 return ret;
84 }
85 std::swap(http_op, op); // store reference in http_op on success
86 return 0;
87 }
88
11fdf7f2
TL
89
90
91 virtual int wait_result() {
9f95a23c 92 return http_op->wait(result, null_yield);
11fdf7f2
TL
93 }
94
7c673cae 95 int request_complete() override {
11fdf7f2
TL
96 int ret;
97
98 ret = wait_result();
99
7c673cae
FG
100 auto op = std::move(http_op); // release ref on return
101 if (ret < 0) {
102 error_stream << "http operation failed: " << op->to_str()
11fdf7f2 103 << " status=" << op->get_http_status() << std::endl;
7c673cae
FG
104 op->put();
105 return ret;
106 }
107 op->put();
108 return 0;
109 }
110
111 void request_cleanup() override {
112 if (http_op) {
113 http_op->put();
114 http_op = NULL;
115 }
116 }
11fdf7f2 117
7c673cae
FG
118};
119
11fdf7f2
TL
120
121template <class T>
122class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR {
123 T *result;
124 public:
125 RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
126 RGWHTTPManager *_http_manager, const string& _path,
127 rgw_http_param_pair *params, T *_result)
128 : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result)
129 {}
130
131 RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
132 RGWHTTPManager *_http_manager, const string& _path,
133 rgw_http_param_pair *params,
134 std::map <std::string, std::string> *hdrs,
135 T *_result)
136 : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result)
137 {}
138
139 int wait_result() override {
9f95a23c 140 return http_op->wait(result, null_yield);
11fdf7f2
TL
141 }
142
143};
144
145template <class T, class E = int>
146class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine {
147 protected:
7c673cae
FG
148 RGWRESTConn *conn;
149 RGWHTTPManager *http_manager;
150 string method;
151 string path;
152 param_vec_t params;
a8e16298 153 param_vec_t headers;
11fdf7f2 154 map<string, string> *attrs;
7c673cae 155 T *result;
a8e16298
TL
156 E *err_result;
157 bufferlist input_bl;
11fdf7f2 158 bool send_content_length=false;
7c673cae
FG
159 boost::intrusive_ptr<RGWRESTSendResource> http_op;
160
11fdf7f2
TL
161 public:
162 RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
163 RGWHTTPManager *_http_manager,
164 const string& _method, const string& _path,
165 rgw_http_param_pair *_params,
166 map<string, string> *_attrs,
167 bufferlist& _input, T *_result,
168 bool _send_content_length,
169 E *_err_result = nullptr)
170 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
171 method(_method), path(_path), params(make_param_list(_params)),
172 headers(make_param_list(_attrs)), attrs(_attrs),
173 result(_result), err_result(_err_result),
174 input_bl(_input), send_content_length(_send_content_length) {}
175
176 RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
177 RGWHTTPManager *_http_manager,
178 const string& _method, const string& _path,
179 rgw_http_param_pair *_params, map<string, string> *_attrs,
180 T *_result, E *_err_result = nullptr)
181 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
182 method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result),
183 err_result(_err_result) {}
7c673cae 184
11fdf7f2 185 ~RGWSendRawRESTResourceCR() override {
7c673cae
FG
186 request_cleanup();
187 }
188
189 int send_request() override {
190 auto op = boost::intrusive_ptr<RGWRESTSendResource>(
a8e16298 191 new RGWRESTSendResource(conn, method, path, params, &headers, http_manager));
7c673cae 192
11fdf7f2 193 init_new_io(op.get());
7c673cae 194
a8e16298 195 int ret = op->aio_send(input_bl);
7c673cae
FG
196 if (ret < 0) {
197 lsubdout(cct, rgw, 0) << "ERROR: failed to send request" << dendl;
198 op->put();
199 return ret;
200 }
201 std::swap(http_op, op); // store reference in http_op on success
202 return 0;
203 }
204
205 int request_complete() override {
206 int ret;
a8e16298 207 if (result || err_result) {
9f95a23c 208 ret = http_op->wait(result, null_yield, err_result);
7c673cae
FG
209 } else {
210 bufferlist bl;
9f95a23c 211 ret = http_op->wait(&bl, null_yield);
7c673cae
FG
212 }
213 auto op = std::move(http_op); // release ref on return
214 if (ret < 0) {
215 error_stream << "http operation failed: " << op->to_str()
216 << " status=" << op->get_http_status() << std::endl;
217 lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret
218 << ": " << op->to_str() << dendl;
219 op->put();
220 return ret;
221 }
222 op->put();
223 return 0;
224 }
225
226 void request_cleanup() override {
227 if (http_op) {
228 http_op->put();
229 http_op = NULL;
230 }
231 }
232};
233
11fdf7f2
TL
234template <class S, class T, class E = int>
235class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T, E> {
236 public:
237 RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
238 RGWHTTPManager *_http_manager,
239 const string& _method, const string& _path,
240 rgw_http_param_pair *_params, map<string, string> *_attrs,
241 S& _input, T *_result, E *_err_result = nullptr)
242 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, _method, _path, _params, _attrs, _result, _err_result) {
243
244 JSONFormatter jf;
245 encode_json("data", _input, &jf);
246 std::stringstream ss;
247 jf.flush(ss);
248 //bufferlist bl;
249 this->input_bl.append(ss.str());
250 }
251
252};
253
a8e16298
TL
254template <class S, class T, class E = int>
255class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> {
7c673cae
FG
256public:
257 RGWPostRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
258 RGWHTTPManager *_http_manager,
259 const string& _path,
a8e16298
TL
260 rgw_http_param_pair *_params, S& _input,
261 T *_result, E *_err_result = nullptr)
262 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
7c673cae 263 "POST", _path,
11fdf7f2
TL
264 _params, nullptr, _input,
265 _result, _err_result) {}
266};
267
268template <class T, class E = int>
269class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> {
270 public:
271 RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
272 RGWHTTPManager *_http_manager,
273 const string& _path,
274 rgw_http_param_pair *_params, bufferlist& _input,
275 T *_result, E *_err_result = nullptr)
276 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "PUT", _path,
277 _params, nullptr, _input, _result, true, _err_result) {}
278
7c673cae
FG
279};
280
11fdf7f2
TL
281template <class T, class E = int>
282class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> {
283 public:
284 RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
285 RGWHTTPManager *_http_manager,
286 const string& _path,
287 rgw_http_param_pair *_params,
288 map<string, string> * _attrs,
289 bufferlist& _input,
290 T *_result, E *_err_result = nullptr)
291 : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "POST", _path,
292 _params, _attrs, _input, _result, true, _err_result) {}
293
294};
295
296
a8e16298
TL
297template <class S, class T, class E = int>
298class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> {
7c673cae
FG
299public:
300 RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
301 RGWHTTPManager *_http_manager,
302 const string& _path,
a8e16298
TL
303 rgw_http_param_pair *_params, S& _input,
304 T *_result, E *_err_result = nullptr)
305 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
306 "PUT", _path,
307 _params, nullptr, _input,
308 _result, _err_result) {}
309
310 RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
311 RGWHTTPManager *_http_manager,
312 const string& _path,
313 rgw_http_param_pair *_params,
314 map <string, string> *_attrs,
315 S& _input, T *_result, E *_err_result = nullptr)
316 : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager,
317 "PUT", _path,
318 _params, _attrs, _input,
319 _result, _err_result) {}
11fdf7f2 320
7c673cae
FG
321};
322
323class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine {
324 RGWRESTConn *conn;
325 RGWHTTPManager *http_manager;
326 string path;
327 param_vec_t params;
328
329 boost::intrusive_ptr<RGWRESTDeleteResource> http_op;
330
331public:
332 RGWDeleteRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn,
333 RGWHTTPManager *_http_manager,
334 const string& _path,
335 rgw_http_param_pair *_params)
336 : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager),
337 path(_path), params(make_param_list(_params))
338 {}
339
340 ~RGWDeleteRESTResourceCR() override {
341 request_cleanup();
342 }
343
344 int send_request() override {
345 auto op = boost::intrusive_ptr<RGWRESTDeleteResource>(
346 new RGWRESTDeleteResource(conn, path, params, nullptr, http_manager));
347
11fdf7f2 348 init_new_io(op.get());
7c673cae
FG
349
350 bufferlist bl;
351
352 int ret = op->aio_send(bl);
353 if (ret < 0) {
354 lsubdout(cct, rgw, 0) << "ERROR: failed to send DELETE request" << dendl;
355 op->put();
356 return ret;
357 }
358 std::swap(http_op, op); // store reference in http_op on success
359 return 0;
360 }
361
362 int request_complete() override {
363 int ret;
364 bufferlist bl;
9f95a23c 365 ret = http_op->wait(&bl, null_yield);
7c673cae
FG
366 auto op = std::move(http_op); // release ref on return
367 if (ret < 0) {
368 error_stream << "http operation failed: " << op->to_str()
369 << " status=" << op->get_http_status() << std::endl;
370 lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret
371 << ": " << op->to_str() << dendl;
372 op->put();
373 return ret;
374 }
375 op->put();
376 return 0;
377 }
378
379 void request_cleanup() override {
380 if (http_op) {
381 http_op->put();
382 http_op = NULL;
383 }
384 }
385};
386
11fdf7f2 387class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB {
9f95a23c 388 ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB");
11fdf7f2
TL
389 RGWCoroutinesEnv *env;
390 RGWCoroutine *cr;
391 RGWHTTPStreamRWRequest *req;
392 rgw_io_id io_id;
393 bufferlist data;
394 bufferlist extra_data;
395 bool got_all_extra_data{false};
396 bool paused{false};
397 bool notified{false};
398public:
399 RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req);
400
401 int handle_data(bufferlist& bl, bool *pause) override;
402
403 void claim_data(bufferlist *dest, uint64_t max);
404
405 bufferlist& get_extra_data() {
406 return extra_data;
407 }
408
409 bool has_data() {
410 return (data.length() > 0);
411 }
412
413 bool has_all_extra_data() {
414 return got_all_extra_data;
415 }
416};
417
418
419class RGWStreamReadResourceCRF {
420protected:
421 boost::asio::coroutine read_state;
422
423public:
424 virtual int init() = 0;
425 virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */
426 virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) = 0;
427 virtual bool has_attrs() = 0;
428 virtual void get_attrs(std::map<string, string> *attrs) = 0;
429 virtual ~RGWStreamReadResourceCRF() = default;
430};
431
432class RGWStreamWriteResourceCRF {
433protected:
434 boost::asio::coroutine write_state;
435 boost::asio::coroutine drain_state;
436
437public:
438 virtual int init() = 0;
439 virtual void send_ready(const rgw_rest_obj& rest_obj) = 0;
440 virtual int send() = 0;
441 virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */
442 virtual int drain_writes(bool *need_retry) = 0; /* reentrant */
443
444 virtual ~RGWStreamWriteResourceCRF() = default;
445};
446
447class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF {
448 CephContext *cct;
449 RGWCoroutinesEnv *env;
450 RGWCoroutine *caller;
451 RGWHTTPManager *http_manager;
452
453 RGWHTTPStreamRWRequest *req{nullptr};
454
455 std::optional<RGWCRHTTPGetDataCB> in_cb;
456
457 bufferlist extra_data;
458
459 bool got_attrs{false};
460 bool got_extra_data{false};
461
462 rgw_io_id io_read_mask;
463
464protected:
465 rgw_rest_obj rest_obj;
466
467 struct range_info {
468 bool is_set{false};
469 uint64_t ofs;
470 uint64_t size;
471 } range;
472
473 ceph::real_time mtime;
474 string etag;
475
476public:
477 RGWStreamReadHTTPResourceCRF(CephContext *_cct,
478 RGWCoroutinesEnv *_env,
479 RGWCoroutine *_caller,
480 RGWHTTPManager *_http_manager,
481 const rgw_obj_key& _src_key) : cct(_cct),
482 env(_env),
483 caller(_caller),
484 http_manager(_http_manager) {
485 rest_obj.init(_src_key);
486 }
487 ~RGWStreamReadHTTPResourceCRF();
488
489 int init() override;
490 int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */
491 int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override;
492 bool has_attrs() override;
493 void get_attrs(std::map<string, string> *attrs) override;
494 bool is_done();
495 virtual bool need_extra_data() { return false; }
496
497 void set_req(RGWHTTPStreamRWRequest *r) {
498 req = r;
499 }
500
501 rgw_rest_obj& get_rest_obj() {
502 return rest_obj;
503 }
504
505 void set_range(uint64_t ofs, uint64_t size) {
506 range.is_set = true;
507 range.ofs = ofs;
508 range.size = size;
509 }
510};
511
512class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF {
513protected:
514 RGWCoroutinesEnv *env;
515 RGWCoroutine *caller;
516 RGWHTTPManager *http_manager;
517
518 using lock_guard = std::lock_guard<std::mutex>;
519
520 std::mutex blocked_lock;
521 bool is_blocked;
522
523 RGWHTTPStreamRWRequest *req{nullptr};
524
525 struct multipart_info {
526 bool is_multipart{false};
527 string upload_id;
528 int part_num{0};
529 uint64_t part_size;
530 } multipart;
531
532 class WriteDrainNotify : public RGWWriteDrainCB {
533 RGWStreamWriteHTTPResourceCRF *crf;
534 public:
535 explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {}
536 void notify(uint64_t pending_size) override;
537 } write_drain_notify_cb;
538
539public:
540 RGWStreamWriteHTTPResourceCRF(CephContext *_cct,
541 RGWCoroutinesEnv *_env,
542 RGWCoroutine *_caller,
543 RGWHTTPManager *_http_manager) : env(_env),
544 caller(_caller),
545 http_manager(_http_manager),
546 write_drain_notify_cb(this) {}
547 virtual ~RGWStreamWriteHTTPResourceCRF();
548
549 int init() override {
550 return 0;
551 }
552 void send_ready(const rgw_rest_obj& rest_obj) override;
553 int send() override;
554 int write(bufferlist& data, bool *need_retry) override; /* reentrant */
555 void write_drain_notify(uint64_t pending_size);
556 int drain_writes(bool *need_retry) override; /* reentrant */
557
558 virtual void handle_headers(const std::map<string, string>& headers) {}
559
560 void set_req(RGWHTTPStreamRWRequest *r) {
561 req = r;
562 }
563
564 void set_multipart(const string& upload_id, int part_num, uint64_t part_size) {
565 multipart.is_multipart = true;
566 multipart.upload_id = upload_id;
567 multipart.part_num = part_num;
568 multipart.part_size = part_size;
569 }
570};
571
572class RGWStreamSpliceCR : public RGWCoroutine {
573 CephContext *cct;
574 RGWHTTPManager *http_manager;
575 string url;
576 std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf;
577 std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf;
578 bufferlist bl;
579 bool need_retry{false};
580 bool sent_attrs{false};
581 uint64_t total_read{0};
582 int ret{0};
583public:
584 RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
585 std::shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf,
586 std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf);
587 ~RGWStreamSpliceCR();
588
589 int operate() override;
590};