1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
6 #include <boost/intrusive_ptr.hpp>
8 #include "include/ceph_assert.h" // boost header clobbers our assert.h
10 #include "rgw_coroutine.h"
11 #include "rgw_rest_conn.h"
17 std::map
<string
, string
> attrs
;
18 std::map
<string
, string
> custom_attrs
;
19 RGWAccessControlPolicy acls
;
21 void init(const rgw_obj_key
& _key
) {
26 class RGWReadRawRESTResourceCR
: public RGWSimpleCoroutine
{
30 RGWHTTPManager
*http_manager
;
33 param_vec_t extra_headers
;
35 boost::intrusive_ptr
<RGWRESTReadResource
> http_op
;
36 RGWReadRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
37 RGWHTTPManager
*_http_manager
, const string
& _path
,
38 rgw_http_param_pair
*params
, bufferlist
*_result
)
39 : RGWSimpleCoroutine(_cct
), result(_result
), conn(_conn
), http_manager(_http_manager
),
40 path(_path
), params(make_param_list(params
))
43 RGWReadRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
44 RGWHTTPManager
*_http_manager
, const string
& _path
,
45 rgw_http_param_pair
*params
)
46 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
47 path(_path
), params(make_param_list(params
))
50 RGWReadRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
51 RGWHTTPManager
*_http_manager
, const string
& _path
,
52 rgw_http_param_pair
*params
, param_vec_t
&hdrs
)
53 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
54 path(_path
), params(make_param_list(params
)),
58 RGWReadRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
59 RGWHTTPManager
*_http_manager
, const string
& _path
,
60 rgw_http_param_pair
*params
,
61 std::map
<std::string
, std::string
> *hdrs
)
62 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
63 path(_path
), params(make_param_list(params
)),
64 extra_headers(make_param_list(hdrs
))
68 ~RGWReadRawRESTResourceCR() override
{
72 int send_request() override
{
73 auto op
= boost::intrusive_ptr
<RGWRESTReadResource
>(
74 new RGWRESTReadResource(conn
, path
, params
, &extra_headers
, http_manager
));
76 init_new_io(op
.get());
78 int ret
= op
->aio_read();
80 log_error() << "failed to send http operation: " << op
->to_str()
81 << " ret=" << ret
<< std::endl
;
85 std::swap(http_op
, op
); // store reference in http_op on success
91 virtual int wait_result() {
92 return http_op
->wait(result
, null_yield
);
95 int request_complete() override
{
100 auto op
= std::move(http_op
); // release ref on return
102 error_stream
<< "http operation failed: " << op
->to_str()
103 << " status=" << op
->get_http_status() << std::endl
;
111 void request_cleanup() override
{
122 class RGWReadRESTResourceCR
: public RGWReadRawRESTResourceCR
{
125 RGWReadRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
126 RGWHTTPManager
*_http_manager
, const string
& _path
,
127 rgw_http_param_pair
*params
, T
*_result
)
128 : RGWReadRawRESTResourceCR(_cct
, _conn
, _http_manager
, _path
, params
), result(_result
)
131 RGWReadRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
132 RGWHTTPManager
*_http_manager
, const string
& _path
,
133 rgw_http_param_pair
*params
,
134 std::map
<std::string
, std::string
> *hdrs
,
136 : RGWReadRawRESTResourceCR(_cct
, _conn
, _http_manager
, _path
, params
, hdrs
), result(_result
)
139 int wait_result() override
{
140 return http_op
->wait(result
, null_yield
);
145 template <class T
, class E
= int>
146 class RGWSendRawRESTResourceCR
: public RGWSimpleCoroutine
{
149 RGWHTTPManager
*http_manager
;
154 map
<string
, string
> *attrs
;
158 bool send_content_length
=false;
159 boost::intrusive_ptr
<RGWRESTSendResource
> http_op
;
162 RGWSendRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
163 RGWHTTPManager
*_http_manager
,
164 const string
& _method
, const string
& _path
,
165 rgw_http_param_pair
*_params
,
166 map
<string
, string
> *_attrs
,
167 bufferlist
& _input
, T
*_result
,
168 bool _send_content_length
,
169 E
*_err_result
= nullptr)
170 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
171 method(_method
), path(_path
), params(make_param_list(_params
)),
172 headers(make_param_list(_attrs
)), attrs(_attrs
),
173 result(_result
), err_result(_err_result
),
174 input_bl(_input
), send_content_length(_send_content_length
) {}
176 RGWSendRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
177 RGWHTTPManager
*_http_manager
,
178 const string
& _method
, const string
& _path
,
179 rgw_http_param_pair
*_params
, map
<string
, string
> *_attrs
,
180 T
*_result
, E
*_err_result
= nullptr)
181 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
182 method(_method
), path(_path
), params(make_param_list(_params
)), headers(make_param_list(_attrs
)), attrs(_attrs
), result(_result
),
183 err_result(_err_result
) {}
185 ~RGWSendRawRESTResourceCR() override
{
189 int send_request() override
{
190 auto op
= boost::intrusive_ptr
<RGWRESTSendResource
>(
191 new RGWRESTSendResource(conn
, method
, path
, params
, &headers
, http_manager
));
193 init_new_io(op
.get());
195 int ret
= op
->aio_send(input_bl
);
197 lsubdout(cct
, rgw
, 0) << "ERROR: failed to send request" << dendl
;
201 std::swap(http_op
, op
); // store reference in http_op on success
205 int request_complete() override
{
207 if (result
|| err_result
) {
208 ret
= http_op
->wait(result
, null_yield
, err_result
);
211 ret
= http_op
->wait(&bl
, null_yield
);
213 auto op
= std::move(http_op
); // release ref on return
215 error_stream
<< "http operation failed: " << op
->to_str()
216 << " status=" << op
->get_http_status() << std::endl
;
217 lsubdout(cct
, rgw
, 5) << "failed to wait for op, ret=" << ret
218 << ": " << op
->to_str() << dendl
;
226 void request_cleanup() override
{
234 template <class S
, class T
, class E
= int>
235 class RGWSendRESTResourceCR
: public RGWSendRawRESTResourceCR
<T
, E
> {
237 RGWSendRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
238 RGWHTTPManager
*_http_manager
,
239 const string
& _method
, const string
& _path
,
240 rgw_http_param_pair
*_params
, map
<string
, string
> *_attrs
,
241 S
& _input
, T
*_result
, E
*_err_result
= nullptr)
242 : RGWSendRawRESTResourceCR
<T
, E
>(_cct
, _conn
, _http_manager
, _method
, _path
, _params
, _attrs
, _result
, _err_result
) {
245 encode_json("data", _input
, &jf
);
246 std::stringstream ss
;
249 this->input_bl
.append(ss
.str());
254 template <class S
, class T
, class E
= int>
255 class RGWPostRESTResourceCR
: public RGWSendRESTResourceCR
<S
, T
, E
> {
257 RGWPostRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
258 RGWHTTPManager
*_http_manager
,
260 rgw_http_param_pair
*_params
, S
& _input
,
261 T
*_result
, E
*_err_result
= nullptr)
262 : RGWSendRESTResourceCR
<S
, T
, E
>(_cct
, _conn
, _http_manager
,
264 _params
, nullptr, _input
,
265 _result
, _err_result
) {}
268 template <class T
, class E
= int>
269 class RGWPutRawRESTResourceCR
: public RGWSendRawRESTResourceCR
<T
, E
> {
271 RGWPutRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
272 RGWHTTPManager
*_http_manager
,
274 rgw_http_param_pair
*_params
, bufferlist
& _input
,
275 T
*_result
, E
*_err_result
= nullptr)
276 : RGWSendRawRESTResourceCR
<T
, E
>(_cct
, _conn
, _http_manager
, "PUT", _path
,
277 _params
, nullptr, _input
, _result
, true, _err_result
) {}
281 template <class T
, class E
= int>
282 class RGWPostRawRESTResourceCR
: public RGWSendRawRESTResourceCR
<T
, E
> {
284 RGWPostRawRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
285 RGWHTTPManager
*_http_manager
,
287 rgw_http_param_pair
*_params
,
288 map
<string
, string
> * _attrs
,
290 T
*_result
, E
*_err_result
= nullptr)
291 : RGWSendRawRESTResourceCR
<T
, E
>(_cct
, _conn
, _http_manager
, "POST", _path
,
292 _params
, _attrs
, _input
, _result
, true, _err_result
) {}
297 template <class S
, class T
, class E
= int>
298 class RGWPutRESTResourceCR
: public RGWSendRESTResourceCR
<S
, T
, E
> {
300 RGWPutRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
301 RGWHTTPManager
*_http_manager
,
303 rgw_http_param_pair
*_params
, S
& _input
,
304 T
*_result
, E
*_err_result
= nullptr)
305 : RGWSendRESTResourceCR
<S
, T
, E
>(_cct
, _conn
, _http_manager
,
307 _params
, nullptr, _input
,
308 _result
, _err_result
) {}
310 RGWPutRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
311 RGWHTTPManager
*_http_manager
,
313 rgw_http_param_pair
*_params
,
314 map
<string
, string
> *_attrs
,
315 S
& _input
, T
*_result
, E
*_err_result
= nullptr)
316 : RGWSendRESTResourceCR
<S
, T
, E
>(_cct
, _conn
, _http_manager
,
318 _params
, _attrs
, _input
,
319 _result
, _err_result
) {}
323 class RGWDeleteRESTResourceCR
: public RGWSimpleCoroutine
{
325 RGWHTTPManager
*http_manager
;
329 boost::intrusive_ptr
<RGWRESTDeleteResource
> http_op
;
332 RGWDeleteRESTResourceCR(CephContext
*_cct
, RGWRESTConn
*_conn
,
333 RGWHTTPManager
*_http_manager
,
335 rgw_http_param_pair
*_params
)
336 : RGWSimpleCoroutine(_cct
), conn(_conn
), http_manager(_http_manager
),
337 path(_path
), params(make_param_list(_params
))
340 ~RGWDeleteRESTResourceCR() override
{
344 int send_request() override
{
345 auto op
= boost::intrusive_ptr
<RGWRESTDeleteResource
>(
346 new RGWRESTDeleteResource(conn
, path
, params
, nullptr, http_manager
));
348 init_new_io(op
.get());
352 int ret
= op
->aio_send(bl
);
354 lsubdout(cct
, rgw
, 0) << "ERROR: failed to send DELETE request" << dendl
;
358 std::swap(http_op
, op
); // store reference in http_op on success
362 int request_complete() override
{
365 ret
= http_op
->wait(&bl
, null_yield
);
366 auto op
= std::move(http_op
); // release ref on return
368 error_stream
<< "http operation failed: " << op
->to_str()
369 << " status=" << op
->get_http_status() << std::endl
;
370 lsubdout(cct
, rgw
, 5) << "failed to wait for op, ret=" << ret
371 << ": " << op
->to_str() << dendl
;
379 void request_cleanup() override
{
387 class RGWCRHTTPGetDataCB
: public RGWHTTPStreamRWRequest::ReceiveCB
{
388 ceph::mutex lock
= ceph::make_mutex("RGWCRHTTPGetDataCB");
389 RGWCoroutinesEnv
*env
;
391 RGWHTTPStreamRWRequest
*req
;
394 bufferlist extra_data
;
395 bool got_all_extra_data
{false};
397 bool notified
{false};
399 RGWCRHTTPGetDataCB(RGWCoroutinesEnv
*_env
, RGWCoroutine
*_cr
, RGWHTTPStreamRWRequest
*_req
);
401 int handle_data(bufferlist
& bl
, bool *pause
) override
;
403 void claim_data(bufferlist
*dest
, uint64_t max
);
405 bufferlist
& get_extra_data() {
410 return (data
.length() > 0);
413 bool has_all_extra_data() {
414 return got_all_extra_data
;
419 class RGWStreamReadResourceCRF
{
421 boost::asio::coroutine read_state
;
424 virtual int init() = 0;
425 virtual int read(bufferlist
*data
, uint64_t max
, bool *need_retry
) = 0; /* reentrant */
426 virtual int decode_rest_obj(map
<string
, string
>& headers
, bufferlist
& extra_data
) = 0;
427 virtual bool has_attrs() = 0;
428 virtual void get_attrs(std::map
<string
, string
> *attrs
) = 0;
429 virtual ~RGWStreamReadResourceCRF() = default;
432 class RGWStreamWriteResourceCRF
{
434 boost::asio::coroutine write_state
;
435 boost::asio::coroutine drain_state
;
438 virtual int init() = 0;
439 virtual void send_ready(const rgw_rest_obj
& rest_obj
) = 0;
440 virtual int send() = 0;
441 virtual int write(bufferlist
& data
, bool *need_retry
) = 0; /* reentrant */
442 virtual int drain_writes(bool *need_retry
) = 0; /* reentrant */
444 virtual ~RGWStreamWriteResourceCRF() = default;
447 class RGWStreamReadHTTPResourceCRF
: public RGWStreamReadResourceCRF
{
449 RGWCoroutinesEnv
*env
;
450 RGWCoroutine
*caller
;
451 RGWHTTPManager
*http_manager
;
453 RGWHTTPStreamRWRequest
*req
{nullptr};
455 std::optional
<RGWCRHTTPGetDataCB
> in_cb
;
457 bufferlist extra_data
;
459 bool got_attrs
{false};
460 bool got_extra_data
{false};
462 rgw_io_id io_read_mask
;
465 rgw_rest_obj rest_obj
;
473 ceph::real_time mtime
;
477 RGWStreamReadHTTPResourceCRF(CephContext
*_cct
,
478 RGWCoroutinesEnv
*_env
,
479 RGWCoroutine
*_caller
,
480 RGWHTTPManager
*_http_manager
,
481 const rgw_obj_key
& _src_key
) : cct(_cct
),
484 http_manager(_http_manager
) {
485 rest_obj
.init(_src_key
);
487 ~RGWStreamReadHTTPResourceCRF();
490 int read(bufferlist
*data
, uint64_t max
, bool *need_retry
) override
; /* reentrant */
491 int decode_rest_obj(map
<string
, string
>& headers
, bufferlist
& extra_data
) override
;
492 bool has_attrs() override
;
493 void get_attrs(std::map
<string
, string
> *attrs
) override
;
495 virtual bool need_extra_data() { return false; }
497 void set_req(RGWHTTPStreamRWRequest
*r
) {
501 rgw_rest_obj
& get_rest_obj() {
505 void set_range(uint64_t ofs
, uint64_t size
) {
512 class RGWStreamWriteHTTPResourceCRF
: public RGWStreamWriteResourceCRF
{
514 RGWCoroutinesEnv
*env
;
515 RGWCoroutine
*caller
;
516 RGWHTTPManager
*http_manager
;
518 using lock_guard
= std::lock_guard
<std::mutex
>;
520 std::mutex blocked_lock
;
523 RGWHTTPStreamRWRequest
*req
{nullptr};
525 struct multipart_info
{
526 bool is_multipart
{false};
532 class WriteDrainNotify
: public RGWWriteDrainCB
{
533 RGWStreamWriteHTTPResourceCRF
*crf
;
535 explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF
*_crf
) : crf(_crf
) {}
536 void notify(uint64_t pending_size
) override
;
537 } write_drain_notify_cb
;
540 RGWStreamWriteHTTPResourceCRF(CephContext
*_cct
,
541 RGWCoroutinesEnv
*_env
,
542 RGWCoroutine
*_caller
,
543 RGWHTTPManager
*_http_manager
) : env(_env
),
545 http_manager(_http_manager
),
546 write_drain_notify_cb(this) {}
547 virtual ~RGWStreamWriteHTTPResourceCRF();
549 int init() override
{
552 void send_ready(const rgw_rest_obj
& rest_obj
) override
;
554 int write(bufferlist
& data
, bool *need_retry
) override
; /* reentrant */
555 void write_drain_notify(uint64_t pending_size
);
556 int drain_writes(bool *need_retry
) override
; /* reentrant */
558 virtual void handle_headers(const std::map
<string
, string
>& headers
) {}
560 void set_req(RGWHTTPStreamRWRequest
*r
) {
564 void set_multipart(const string
& upload_id
, int part_num
, uint64_t part_size
) {
565 multipart
.is_multipart
= true;
566 multipart
.upload_id
= upload_id
;
567 multipart
.part_num
= part_num
;
568 multipart
.part_size
= part_size
;
572 class RGWStreamSpliceCR
: public RGWCoroutine
{
574 RGWHTTPManager
*http_manager
;
576 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
> in_crf
;
577 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
> out_crf
;
579 bool need_retry
{false};
580 bool sent_attrs
{false};
581 uint64_t total_read
{0};
584 RGWStreamSpliceCR(CephContext
*_cct
, RGWHTTPManager
*_mgr
,
585 std::shared_ptr
<RGWStreamReadHTTPResourceCRF
>& _in_crf
,
586 std::shared_ptr
<RGWStreamWriteHTTPResourceCRF
>& _out_crf
);
587 ~RGWStreamSpliceCR();
589 int operate() override
;