]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 | 3 | |
9f95a23c | 4 | #pragma once |
7c673cae FG |
5 | |
6 | #include <boost/intrusive_ptr.hpp> | |
11fdf7f2 TL |
7 | #include <mutex> |
8 | #include "include/ceph_assert.h" // boost header clobbers our assert.h | |
7c673cae FG |
9 | |
10 | #include "rgw_coroutine.h" | |
11 | #include "rgw_rest_conn.h" | |
12 | ||
11fdf7f2 TL |
13 | |
14 | struct rgw_rest_obj { | |
15 | rgw_obj_key key; | |
16 | uint64_t content_len; | |
20effc67 TL |
17 | std::map<std::string, std::string> attrs; |
18 | std::map<std::string, std::string> custom_attrs; | |
11fdf7f2 TL |
19 | RGWAccessControlPolicy acls; |
20 | ||
21 | void init(const rgw_obj_key& _key) { | |
22 | key = _key; | |
23 | } | |
24 | }; | |
25 | ||
26 | class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine { | |
27 | bufferlist *result; | |
28 | protected: | |
7c673cae FG |
29 | RGWRESTConn *conn; |
30 | RGWHTTPManager *http_manager; | |
20effc67 | 31 | std::string path; |
7c673cae | 32 | param_vec_t params; |
a8e16298 TL |
33 | param_vec_t extra_headers; |
34 | public: | |
7c673cae | 35 | boost::intrusive_ptr<RGWRESTReadResource> http_op; |
11fdf7f2 | 36 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
20effc67 | 37 | RGWHTTPManager *_http_manager, const std::string& _path, |
11fdf7f2 TL |
38 | rgw_http_param_pair *params, bufferlist *_result) |
39 | : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager), | |
40 | path(_path), params(make_param_list(params)) | |
41 | {} | |
7c673cae | 42 | |
11fdf7f2 | 43 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
20effc67 | 44 | RGWHTTPManager *_http_manager, const std::string& _path, |
11fdf7f2 TL |
45 | rgw_http_param_pair *params) |
46 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
47 | path(_path), params(make_param_list(params)) | |
48 | {} | |
49 | ||
50 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
20effc67 | 51 | RGWHTTPManager *_http_manager, const std::string& _path, |
11fdf7f2 | 52 | rgw_http_param_pair *params, param_vec_t &hdrs) |
7c673cae | 53 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), |
11fdf7f2 TL |
54 | path(_path), params(make_param_list(params)), |
55 | extra_headers(hdrs) | |
7c673cae FG |
56 | {} |
57 | ||
11fdf7f2 | 58 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
20effc67 | 59 | RGWHTTPManager *_http_manager, const std::string& _path, |
a8e16298 | 60 | rgw_http_param_pair *params, |
11fdf7f2 | 61 | std::map <std::string, std::string> *hdrs) |
a8e16298 TL |
62 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), |
63 | path(_path), params(make_param_list(params)), | |
11fdf7f2 | 64 | extra_headers(make_param_list(hdrs)) |
a8e16298 TL |
65 | {} |
66 | ||
67 | ||
11fdf7f2 | 68 | ~RGWReadRawRESTResourceCR() override { |
7c673cae FG |
69 | request_cleanup(); |
70 | } | |
71 | ||
b3b6e05e | 72 | int send_request(const DoutPrefixProvider *dpp) override { |
7c673cae | 73 | auto op = boost::intrusive_ptr<RGWRESTReadResource>( |
a8e16298 | 74 | new RGWRESTReadResource(conn, path, params, &extra_headers, http_manager)); |
7c673cae | 75 | |
11fdf7f2 | 76 | init_new_io(op.get()); |
7c673cae | 77 | |
b3b6e05e | 78 | int ret = op->aio_read(dpp); |
7c673cae FG |
79 | if (ret < 0) { |
80 | log_error() << "failed to send http operation: " << op->to_str() | |
81 | << " ret=" << ret << std::endl; | |
82 | op->put(); | |
83 | return ret; | |
84 | } | |
85 | std::swap(http_op, op); // store reference in http_op on success | |
86 | return 0; | |
87 | } | |
88 | ||
11fdf7f2 TL |
89 | |
90 | ||
91 | virtual int wait_result() { | |
9f95a23c | 92 | return http_op->wait(result, null_yield); |
11fdf7f2 TL |
93 | } |
94 | ||
7c673cae | 95 | int request_complete() override { |
11fdf7f2 TL |
96 | int ret; |
97 | ||
98 | ret = wait_result(); | |
99 | ||
7c673cae FG |
100 | auto op = std::move(http_op); // release ref on return |
101 | if (ret < 0) { | |
102 | error_stream << "http operation failed: " << op->to_str() | |
11fdf7f2 | 103 | << " status=" << op->get_http_status() << std::endl; |
7c673cae FG |
104 | op->put(); |
105 | return ret; | |
106 | } | |
107 | op->put(); | |
108 | return 0; | |
109 | } | |
110 | ||
111 | void request_cleanup() override { | |
112 | if (http_op) { | |
113 | http_op->put(); | |
114 | http_op = NULL; | |
115 | } | |
116 | } | |
11fdf7f2 | 117 | |
7c673cae FG |
118 | }; |
119 | ||
11fdf7f2 TL |
120 | |
121 | template <class T> | |
122 | class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR { | |
123 | T *result; | |
124 | public: | |
125 | RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
20effc67 | 126 | RGWHTTPManager *_http_manager, const std::string& _path, |
11fdf7f2 TL |
127 | rgw_http_param_pair *params, T *_result) |
128 | : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result) | |
129 | {} | |
130 | ||
131 | RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
20effc67 | 132 | RGWHTTPManager *_http_manager, const std::string& _path, |
11fdf7f2 TL |
133 | rgw_http_param_pair *params, |
134 | std::map <std::string, std::string> *hdrs, | |
135 | T *_result) | |
136 | : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result) | |
137 | {} | |
138 | ||
139 | int wait_result() override { | |
9f95a23c | 140 | return http_op->wait(result, null_yield); |
11fdf7f2 TL |
141 | } |
142 | ||
143 | }; | |
144 | ||
145 | template <class T, class E = int> | |
146 | class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine { | |
147 | protected: | |
7c673cae FG |
148 | RGWRESTConn *conn; |
149 | RGWHTTPManager *http_manager; | |
20effc67 TL |
150 | std::string method; |
151 | std::string path; | |
7c673cae | 152 | param_vec_t params; |
a8e16298 | 153 | param_vec_t headers; |
20effc67 | 154 | std::map<std::string, std::string> *attrs; |
7c673cae | 155 | T *result; |
a8e16298 TL |
156 | E *err_result; |
157 | bufferlist input_bl; | |
11fdf7f2 | 158 | bool send_content_length=false; |
7c673cae FG |
159 | boost::intrusive_ptr<RGWRESTSendResource> http_op; |
160 | ||
11fdf7f2 TL |
161 | public: |
162 | RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
163 | RGWHTTPManager *_http_manager, | |
20effc67 | 164 | const std::string& _method, const std::string& _path, |
11fdf7f2 | 165 | rgw_http_param_pair *_params, |
20effc67 | 166 | std::map<std::string, std::string> *_attrs, |
11fdf7f2 TL |
167 | bufferlist& _input, T *_result, |
168 | bool _send_content_length, | |
169 | E *_err_result = nullptr) | |
170 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
171 | method(_method), path(_path), params(make_param_list(_params)), | |
172 | headers(make_param_list(_attrs)), attrs(_attrs), | |
173 | result(_result), err_result(_err_result), | |
174 | input_bl(_input), send_content_length(_send_content_length) {} | |
175 | ||
176 | RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
177 | RGWHTTPManager *_http_manager, | |
20effc67 TL |
178 | const std::string& _method, const std::string& _path, |
179 | rgw_http_param_pair *_params, std::map<std::string, std::string> *_attrs, | |
11fdf7f2 TL |
180 | T *_result, E *_err_result = nullptr) |
181 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
182 | method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result), | |
183 | err_result(_err_result) {} | |
7c673cae | 184 | |
11fdf7f2 | 185 | ~RGWSendRawRESTResourceCR() override { |
7c673cae FG |
186 | request_cleanup(); |
187 | } | |
188 | ||
b3b6e05e | 189 | int send_request(const DoutPrefixProvider *dpp) override { |
7c673cae | 190 | auto op = boost::intrusive_ptr<RGWRESTSendResource>( |
a8e16298 | 191 | new RGWRESTSendResource(conn, method, path, params, &headers, http_manager)); |
7c673cae | 192 | |
11fdf7f2 | 193 | init_new_io(op.get()); |
7c673cae | 194 | |
b3b6e05e | 195 | int ret = op->aio_send(dpp, input_bl); |
7c673cae | 196 | if (ret < 0) { |
b3b6e05e | 197 | ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send request" << dendl; |
7c673cae FG |
198 | op->put(); |
199 | return ret; | |
200 | } | |
201 | std::swap(http_op, op); // store reference in http_op on success | |
202 | return 0; | |
203 | } | |
204 | ||
205 | int request_complete() override { | |
206 | int ret; | |
a8e16298 | 207 | if (result || err_result) { |
9f95a23c | 208 | ret = http_op->wait(result, null_yield, err_result); |
7c673cae FG |
209 | } else { |
210 | bufferlist bl; | |
9f95a23c | 211 | ret = http_op->wait(&bl, null_yield); |
7c673cae FG |
212 | } |
213 | auto op = std::move(http_op); // release ref on return | |
214 | if (ret < 0) { | |
215 | error_stream << "http operation failed: " << op->to_str() | |
216 | << " status=" << op->get_http_status() << std::endl; | |
217 | lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret | |
218 | << ": " << op->to_str() << dendl; | |
219 | op->put(); | |
220 | return ret; | |
221 | } | |
222 | op->put(); | |
223 | return 0; | |
224 | } | |
225 | ||
226 | void request_cleanup() override { | |
227 | if (http_op) { | |
228 | http_op->put(); | |
229 | http_op = NULL; | |
230 | } | |
231 | } | |
232 | }; | |
233 | ||
11fdf7f2 TL |
234 | template <class S, class T, class E = int> |
235 | class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T, E> { | |
236 | public: | |
237 | RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
238 | RGWHTTPManager *_http_manager, | |
20effc67 TL |
239 | const std::string& _method, const std::string& _path, |
240 | rgw_http_param_pair *_params, std::map<std::string, std::string> *_attrs, | |
11fdf7f2 TL |
241 | S& _input, T *_result, E *_err_result = nullptr) |
242 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, _method, _path, _params, _attrs, _result, _err_result) { | |
243 | ||
244 | JSONFormatter jf; | |
245 | encode_json("data", _input, &jf); | |
246 | std::stringstream ss; | |
247 | jf.flush(ss); | |
248 | //bufferlist bl; | |
249 | this->input_bl.append(ss.str()); | |
250 | } | |
251 | ||
252 | }; | |
253 | ||
a8e16298 TL |
254 | template <class S, class T, class E = int> |
255 | class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> { | |
7c673cae FG |
256 | public: |
257 | RGWPostRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
258 | RGWHTTPManager *_http_manager, | |
20effc67 | 259 | const std::string& _path, |
a8e16298 TL |
260 | rgw_http_param_pair *_params, S& _input, |
261 | T *_result, E *_err_result = nullptr) | |
262 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
7c673cae | 263 | "POST", _path, |
11fdf7f2 TL |
264 | _params, nullptr, _input, |
265 | _result, _err_result) {} | |
266 | }; | |
267 | ||
268 | template <class T, class E = int> | |
269 | class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> { | |
270 | public: | |
271 | RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
272 | RGWHTTPManager *_http_manager, | |
20effc67 | 273 | const std::string& _path, |
11fdf7f2 TL |
274 | rgw_http_param_pair *_params, bufferlist& _input, |
275 | T *_result, E *_err_result = nullptr) | |
276 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "PUT", _path, | |
277 | _params, nullptr, _input, _result, true, _err_result) {} | |
278 | ||
7c673cae FG |
279 | }; |
280 | ||
11fdf7f2 TL |
281 | template <class T, class E = int> |
282 | class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> { | |
283 | public: | |
284 | RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
285 | RGWHTTPManager *_http_manager, | |
20effc67 | 286 | const std::string& _path, |
11fdf7f2 | 287 | rgw_http_param_pair *_params, |
20effc67 | 288 | std::map<std::string, std::string> * _attrs, |
11fdf7f2 TL |
289 | bufferlist& _input, |
290 | T *_result, E *_err_result = nullptr) | |
291 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "POST", _path, | |
292 | _params, _attrs, _input, _result, true, _err_result) {} | |
293 | ||
294 | }; | |
295 | ||
296 | ||
a8e16298 TL |
297 | template <class S, class T, class E = int> |
298 | class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> { | |
7c673cae FG |
299 | public: |
300 | RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
301 | RGWHTTPManager *_http_manager, | |
20effc67 | 302 | const std::string& _path, |
a8e16298 TL |
303 | rgw_http_param_pair *_params, S& _input, |
304 | T *_result, E *_err_result = nullptr) | |
305 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
306 | "PUT", _path, | |
307 | _params, nullptr, _input, | |
308 | _result, _err_result) {} | |
309 | ||
310 | RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
311 | RGWHTTPManager *_http_manager, | |
20effc67 | 312 | const std::string& _path, |
a8e16298 | 313 | rgw_http_param_pair *_params, |
20effc67 | 314 | std::map<std::string, std::string> *_attrs, |
a8e16298 TL |
315 | S& _input, T *_result, E *_err_result = nullptr) |
316 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
317 | "PUT", _path, | |
318 | _params, _attrs, _input, | |
319 | _result, _err_result) {} | |
11fdf7f2 | 320 | |
7c673cae FG |
321 | }; |
322 | ||
323 | class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine { | |
324 | RGWRESTConn *conn; | |
325 | RGWHTTPManager *http_manager; | |
20effc67 | 326 | std::string path; |
7c673cae FG |
327 | param_vec_t params; |
328 | ||
329 | boost::intrusive_ptr<RGWRESTDeleteResource> http_op; | |
330 | ||
331 | public: | |
332 | RGWDeleteRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
333 | RGWHTTPManager *_http_manager, | |
20effc67 | 334 | const std::string& _path, |
7c673cae FG |
335 | rgw_http_param_pair *_params) |
336 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
337 | path(_path), params(make_param_list(_params)) | |
338 | {} | |
339 | ||
340 | ~RGWDeleteRESTResourceCR() override { | |
341 | request_cleanup(); | |
342 | } | |
343 | ||
b3b6e05e | 344 | int send_request(const DoutPrefixProvider *dpp) override { |
7c673cae FG |
345 | auto op = boost::intrusive_ptr<RGWRESTDeleteResource>( |
346 | new RGWRESTDeleteResource(conn, path, params, nullptr, http_manager)); | |
347 | ||
11fdf7f2 | 348 | init_new_io(op.get()); |
7c673cae FG |
349 | |
350 | bufferlist bl; | |
351 | ||
b3b6e05e | 352 | int ret = op->aio_send(dpp, bl); |
7c673cae | 353 | if (ret < 0) { |
b3b6e05e | 354 | ldpp_subdout(dpp, rgw, 0) << "ERROR: failed to send DELETE request" << dendl; |
7c673cae FG |
355 | op->put(); |
356 | return ret; | |
357 | } | |
358 | std::swap(http_op, op); // store reference in http_op on success | |
359 | return 0; | |
360 | } | |
361 | ||
362 | int request_complete() override { | |
363 | int ret; | |
364 | bufferlist bl; | |
9f95a23c | 365 | ret = http_op->wait(&bl, null_yield); |
7c673cae FG |
366 | auto op = std::move(http_op); // release ref on return |
367 | if (ret < 0) { | |
368 | error_stream << "http operation failed: " << op->to_str() | |
369 | << " status=" << op->get_http_status() << std::endl; | |
370 | lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret | |
371 | << ": " << op->to_str() << dendl; | |
372 | op->put(); | |
373 | return ret; | |
374 | } | |
375 | op->put(); | |
376 | return 0; | |
377 | } | |
378 | ||
379 | void request_cleanup() override { | |
380 | if (http_op) { | |
381 | http_op->put(); | |
382 | http_op = NULL; | |
383 | } | |
384 | } | |
385 | }; | |
386 | ||
11fdf7f2 | 387 | class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { |
9f95a23c | 388 | ceph::mutex lock = ceph::make_mutex("RGWCRHTTPGetDataCB"); |
11fdf7f2 TL |
389 | RGWCoroutinesEnv *env; |
390 | RGWCoroutine *cr; | |
391 | RGWHTTPStreamRWRequest *req; | |
392 | rgw_io_id io_id; | |
393 | bufferlist data; | |
394 | bufferlist extra_data; | |
395 | bool got_all_extra_data{false}; | |
396 | bool paused{false}; | |
397 | bool notified{false}; | |
398 | public: | |
399 | RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req); | |
400 | ||
401 | int handle_data(bufferlist& bl, bool *pause) override; | |
402 | ||
403 | void claim_data(bufferlist *dest, uint64_t max); | |
404 | ||
405 | bufferlist& get_extra_data() { | |
406 | return extra_data; | |
407 | } | |
408 | ||
409 | bool has_data() { | |
410 | return (data.length() > 0); | |
411 | } | |
412 | ||
413 | bool has_all_extra_data() { | |
414 | return got_all_extra_data; | |
415 | } | |
416 | }; | |
417 | ||
418 | ||
419 | class RGWStreamReadResourceCRF { | |
420 | protected: | |
421 | boost::asio::coroutine read_state; | |
422 | ||
423 | public: | |
b3b6e05e | 424 | virtual int init(const DoutPrefixProvider *dpp) = 0; |
20effc67 TL |
425 | virtual int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ |
426 | virtual int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) = 0; | |
11fdf7f2 | 427 | virtual bool has_attrs() = 0; |
20effc67 | 428 | virtual void get_attrs(std::map<std::string, std::string> *attrs) = 0; |
11fdf7f2 TL |
429 | virtual ~RGWStreamReadResourceCRF() = default; |
430 | }; | |
431 | ||
432 | class RGWStreamWriteResourceCRF { | |
433 | protected: | |
434 | boost::asio::coroutine write_state; | |
435 | boost::asio::coroutine drain_state; | |
436 | ||
437 | public: | |
438 | virtual int init() = 0; | |
b3b6e05e | 439 | virtual void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) = 0; |
11fdf7f2 TL |
440 | virtual int send() = 0; |
441 | virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */ | |
442 | virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ | |
1e59de90 | 443 | |
11fdf7f2 TL |
444 | virtual ~RGWStreamWriteResourceCRF() = default; |
445 | }; | |
446 | ||
447 | class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { | |
448 | CephContext *cct; | |
449 | RGWCoroutinesEnv *env; | |
450 | RGWCoroutine *caller; | |
451 | RGWHTTPManager *http_manager; | |
452 | ||
453 | RGWHTTPStreamRWRequest *req{nullptr}; | |
454 | ||
455 | std::optional<RGWCRHTTPGetDataCB> in_cb; | |
456 | ||
457 | bufferlist extra_data; | |
458 | ||
459 | bool got_attrs{false}; | |
460 | bool got_extra_data{false}; | |
461 | ||
462 | rgw_io_id io_read_mask; | |
463 | ||
464 | protected: | |
465 | rgw_rest_obj rest_obj; | |
466 | ||
467 | struct range_info { | |
468 | bool is_set{false}; | |
469 | uint64_t ofs; | |
470 | uint64_t size; | |
471 | } range; | |
472 | ||
473 | ceph::real_time mtime; | |
20effc67 | 474 | std::string etag; |
11fdf7f2 TL |
475 | |
476 | public: | |
477 | RGWStreamReadHTTPResourceCRF(CephContext *_cct, | |
478 | RGWCoroutinesEnv *_env, | |
479 | RGWCoroutine *_caller, | |
480 | RGWHTTPManager *_http_manager, | |
481 | const rgw_obj_key& _src_key) : cct(_cct), | |
482 | env(_env), | |
483 | caller(_caller), | |
484 | http_manager(_http_manager) { | |
485 | rest_obj.init(_src_key); | |
486 | } | |
487 | ~RGWStreamReadHTTPResourceCRF(); | |
488 | ||
b3b6e05e | 489 | int init(const DoutPrefixProvider *dpp) override; |
20effc67 TL |
490 | int read(const DoutPrefixProvider *dpp, bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ |
491 | int decode_rest_obj(const DoutPrefixProvider *dpp, std::map<std::string, std::string>& headers, bufferlist& extra_data) override; | |
11fdf7f2 | 492 | bool has_attrs() override; |
20effc67 | 493 | void get_attrs(std::map<std::string, std::string> *attrs) override; |
11fdf7f2 TL |
494 | bool is_done(); |
495 | virtual bool need_extra_data() { return false; } | |
496 | ||
497 | void set_req(RGWHTTPStreamRWRequest *r) { | |
498 | req = r; | |
499 | } | |
500 | ||
501 | rgw_rest_obj& get_rest_obj() { | |
502 | return rest_obj; | |
503 | } | |
504 | ||
505 | void set_range(uint64_t ofs, uint64_t size) { | |
506 | range.is_set = true; | |
507 | range.ofs = ofs; | |
508 | range.size = size; | |
509 | } | |
510 | }; | |
511 | ||
512 | class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { | |
513 | protected: | |
514 | RGWCoroutinesEnv *env; | |
515 | RGWCoroutine *caller; | |
516 | RGWHTTPManager *http_manager; | |
517 | ||
518 | using lock_guard = std::lock_guard<std::mutex>; | |
519 | ||
520 | std::mutex blocked_lock; | |
521 | bool is_blocked; | |
522 | ||
523 | RGWHTTPStreamRWRequest *req{nullptr}; | |
524 | ||
525 | struct multipart_info { | |
526 | bool is_multipart{false}; | |
20effc67 | 527 | std::string upload_id; |
11fdf7f2 TL |
528 | int part_num{0}; |
529 | uint64_t part_size; | |
530 | } multipart; | |
531 | ||
532 | class WriteDrainNotify : public RGWWriteDrainCB { | |
533 | RGWStreamWriteHTTPResourceCRF *crf; | |
534 | public: | |
535 | explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {} | |
536 | void notify(uint64_t pending_size) override; | |
537 | } write_drain_notify_cb; | |
538 | ||
539 | public: | |
540 | RGWStreamWriteHTTPResourceCRF(CephContext *_cct, | |
541 | RGWCoroutinesEnv *_env, | |
542 | RGWCoroutine *_caller, | |
543 | RGWHTTPManager *_http_manager) : env(_env), | |
544 | caller(_caller), | |
545 | http_manager(_http_manager), | |
546 | write_drain_notify_cb(this) {} | |
547 | virtual ~RGWStreamWriteHTTPResourceCRF(); | |
548 | ||
549 | int init() override { | |
550 | return 0; | |
551 | } | |
b3b6e05e | 552 | void send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) override; |
11fdf7f2 TL |
553 | int send() override; |
554 | int write(bufferlist& data, bool *need_retry) override; /* reentrant */ | |
555 | void write_drain_notify(uint64_t pending_size); | |
556 | int drain_writes(bool *need_retry) override; /* reentrant */ | |
557 | ||
20effc67 | 558 | virtual void handle_headers(const std::map<std::string, std::string>& headers) {} |
11fdf7f2 TL |
559 | |
560 | void set_req(RGWHTTPStreamRWRequest *r) { | |
561 | req = r; | |
562 | } | |
563 | ||
20effc67 | 564 | void set_multipart(const std::string& upload_id, int part_num, uint64_t part_size) { |
11fdf7f2 TL |
565 | multipart.is_multipart = true; |
566 | multipart.upload_id = upload_id; | |
567 | multipart.part_num = part_num; | |
568 | multipart.part_size = part_size; | |
569 | } | |
570 | }; | |
571 | ||
572 | class RGWStreamSpliceCR : public RGWCoroutine { | |
573 | CephContext *cct; | |
574 | RGWHTTPManager *http_manager; | |
20effc67 | 575 | std::string url; |
11fdf7f2 TL |
576 | std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; |
577 | std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; | |
578 | bufferlist bl; | |
579 | bool need_retry{false}; | |
580 | bool sent_attrs{false}; | |
581 | uint64_t total_read{0}; | |
582 | int ret{0}; | |
583 | public: | |
584 | RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, | |
585 | std::shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf, | |
586 | std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf); | |
587 | ~RGWStreamSpliceCR(); | |
588 | ||
b3b6e05e | 589 | int operate(const DoutPrefixProvider *dpp) override; |
11fdf7f2 | 590 | }; |