]>
Commit | Line | Data |
---|---|---|
11fdf7f2 TL |
1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
2 | // vim: ts=8 sw=2 smarttab | |
3 | ||
7c673cae FG |
4 | #ifndef CEPH_RGW_CR_REST_H |
5 | #define CEPH_RGW_CR_REST_H | |
6 | ||
7 | #include <boost/intrusive_ptr.hpp> | |
11fdf7f2 TL |
8 | #include <mutex> |
9 | #include "include/ceph_assert.h" // boost header clobbers our assert.h | |
7c673cae FG |
10 | |
11 | #include "rgw_coroutine.h" | |
12 | #include "rgw_rest_conn.h" | |
13 | ||
11fdf7f2 TL |
14 | |
15 | struct rgw_rest_obj { | |
16 | rgw_obj_key key; | |
17 | uint64_t content_len; | |
18 | std::map<string, string> attrs; | |
19 | std::map<string, string> custom_attrs; | |
20 | RGWAccessControlPolicy acls; | |
21 | ||
22 | void init(const rgw_obj_key& _key) { | |
23 | key = _key; | |
24 | } | |
25 | }; | |
26 | ||
27 | class RGWReadRawRESTResourceCR : public RGWSimpleCoroutine { | |
28 | bufferlist *result; | |
29 | protected: | |
7c673cae FG |
30 | RGWRESTConn *conn; |
31 | RGWHTTPManager *http_manager; | |
32 | string path; | |
33 | param_vec_t params; | |
a8e16298 TL |
34 | param_vec_t extra_headers; |
35 | public: | |
7c673cae | 36 | boost::intrusive_ptr<RGWRESTReadResource> http_op; |
11fdf7f2 TL |
37 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
38 | RGWHTTPManager *_http_manager, const string& _path, | |
39 | rgw_http_param_pair *params, bufferlist *_result) | |
40 | : RGWSimpleCoroutine(_cct), result(_result), conn(_conn), http_manager(_http_manager), | |
41 | path(_path), params(make_param_list(params)) | |
42 | {} | |
7c673cae | 43 | |
11fdf7f2 TL |
44 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
45 | RGWHTTPManager *_http_manager, const string& _path, | |
46 | rgw_http_param_pair *params) | |
47 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
48 | path(_path), params(make_param_list(params)) | |
49 | {} | |
50 | ||
51 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
52 | RGWHTTPManager *_http_manager, const string& _path, | |
53 | rgw_http_param_pair *params, param_vec_t &hdrs) | |
7c673cae | 54 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), |
11fdf7f2 TL |
55 | path(_path), params(make_param_list(params)), |
56 | extra_headers(hdrs) | |
7c673cae FG |
57 | {} |
58 | ||
11fdf7f2 | 59 | RGWReadRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, |
a8e16298 TL |
60 | RGWHTTPManager *_http_manager, const string& _path, |
61 | rgw_http_param_pair *params, | |
11fdf7f2 | 62 | std::map <std::string, std::string> *hdrs) |
a8e16298 TL |
63 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), |
64 | path(_path), params(make_param_list(params)), | |
11fdf7f2 | 65 | extra_headers(make_param_list(hdrs)) |
a8e16298 TL |
66 | {} |
67 | ||
68 | ||
11fdf7f2 | 69 | ~RGWReadRawRESTResourceCR() override { |
7c673cae FG |
70 | request_cleanup(); |
71 | } | |
72 | ||
73 | int send_request() override { | |
74 | auto op = boost::intrusive_ptr<RGWRESTReadResource>( | |
a8e16298 | 75 | new RGWRESTReadResource(conn, path, params, &extra_headers, http_manager)); |
7c673cae | 76 | |
11fdf7f2 | 77 | init_new_io(op.get()); |
7c673cae FG |
78 | |
79 | int ret = op->aio_read(); | |
80 | if (ret < 0) { | |
81 | log_error() << "failed to send http operation: " << op->to_str() | |
82 | << " ret=" << ret << std::endl; | |
83 | op->put(); | |
84 | return ret; | |
85 | } | |
86 | std::swap(http_op, op); // store reference in http_op on success | |
87 | return 0; | |
88 | } | |
89 | ||
11fdf7f2 TL |
90 | |
91 | ||
92 | virtual int wait_result() { | |
93 | return http_op->wait(result); | |
94 | } | |
95 | ||
7c673cae | 96 | int request_complete() override { |
11fdf7f2 TL |
97 | int ret; |
98 | ||
99 | ret = wait_result(); | |
100 | ||
7c673cae FG |
101 | auto op = std::move(http_op); // release ref on return |
102 | if (ret < 0) { | |
103 | error_stream << "http operation failed: " << op->to_str() | |
11fdf7f2 | 104 | << " status=" << op->get_http_status() << std::endl; |
7c673cae FG |
105 | op->put(); |
106 | return ret; | |
107 | } | |
108 | op->put(); | |
109 | return 0; | |
110 | } | |
111 | ||
112 | void request_cleanup() override { | |
113 | if (http_op) { | |
114 | http_op->put(); | |
115 | http_op = NULL; | |
116 | } | |
117 | } | |
11fdf7f2 | 118 | |
7c673cae FG |
119 | }; |
120 | ||
11fdf7f2 TL |
121 | |
122 | template <class T> | |
123 | class RGWReadRESTResourceCR : public RGWReadRawRESTResourceCR { | |
124 | T *result; | |
125 | public: | |
126 | RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
127 | RGWHTTPManager *_http_manager, const string& _path, | |
128 | rgw_http_param_pair *params, T *_result) | |
129 | : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params), result(_result) | |
130 | {} | |
131 | ||
132 | RGWReadRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
133 | RGWHTTPManager *_http_manager, const string& _path, | |
134 | rgw_http_param_pair *params, | |
135 | std::map <std::string, std::string> *hdrs, | |
136 | T *_result) | |
137 | : RGWReadRawRESTResourceCR(_cct, _conn, _http_manager, _path, params, hdrs), result(_result) | |
138 | {} | |
139 | ||
140 | int wait_result() override { | |
141 | return http_op->wait(result); | |
142 | } | |
143 | ||
144 | }; | |
145 | ||
146 | template <class T, class E = int> | |
147 | class RGWSendRawRESTResourceCR: public RGWSimpleCoroutine { | |
148 | protected: | |
7c673cae FG |
149 | RGWRESTConn *conn; |
150 | RGWHTTPManager *http_manager; | |
151 | string method; | |
152 | string path; | |
153 | param_vec_t params; | |
a8e16298 | 154 | param_vec_t headers; |
11fdf7f2 | 155 | map<string, string> *attrs; |
7c673cae | 156 | T *result; |
a8e16298 TL |
157 | E *err_result; |
158 | bufferlist input_bl; | |
11fdf7f2 | 159 | bool send_content_length=false; |
7c673cae FG |
160 | boost::intrusive_ptr<RGWRESTSendResource> http_op; |
161 | ||
11fdf7f2 TL |
162 | public: |
163 | RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
164 | RGWHTTPManager *_http_manager, | |
165 | const string& _method, const string& _path, | |
166 | rgw_http_param_pair *_params, | |
167 | map<string, string> *_attrs, | |
168 | bufferlist& _input, T *_result, | |
169 | bool _send_content_length, | |
170 | E *_err_result = nullptr) | |
171 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
172 | method(_method), path(_path), params(make_param_list(_params)), | |
173 | headers(make_param_list(_attrs)), attrs(_attrs), | |
174 | result(_result), err_result(_err_result), | |
175 | input_bl(_input), send_content_length(_send_content_length) {} | |
176 | ||
177 | RGWSendRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
178 | RGWHTTPManager *_http_manager, | |
179 | const string& _method, const string& _path, | |
180 | rgw_http_param_pair *_params, map<string, string> *_attrs, | |
181 | T *_result, E *_err_result = nullptr) | |
182 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
183 | method(_method), path(_path), params(make_param_list(_params)), headers(make_param_list(_attrs)), attrs(_attrs), result(_result), | |
184 | err_result(_err_result) {} | |
7c673cae | 185 | |
11fdf7f2 | 186 | ~RGWSendRawRESTResourceCR() override { |
7c673cae FG |
187 | request_cleanup(); |
188 | } | |
189 | ||
190 | int send_request() override { | |
191 | auto op = boost::intrusive_ptr<RGWRESTSendResource>( | |
a8e16298 | 192 | new RGWRESTSendResource(conn, method, path, params, &headers, http_manager)); |
7c673cae | 193 | |
11fdf7f2 | 194 | init_new_io(op.get()); |
7c673cae | 195 | |
a8e16298 | 196 | int ret = op->aio_send(input_bl); |
7c673cae FG |
197 | if (ret < 0) { |
198 | lsubdout(cct, rgw, 0) << "ERROR: failed to send request" << dendl; | |
199 | op->put(); | |
200 | return ret; | |
201 | } | |
202 | std::swap(http_op, op); // store reference in http_op on success | |
203 | return 0; | |
204 | } | |
205 | ||
206 | int request_complete() override { | |
207 | int ret; | |
a8e16298 TL |
208 | if (result || err_result) { |
209 | ret = http_op->wait(result, err_result); | |
7c673cae FG |
210 | } else { |
211 | bufferlist bl; | |
11fdf7f2 | 212 | ret = http_op->wait(&bl); |
7c673cae FG |
213 | } |
214 | auto op = std::move(http_op); // release ref on return | |
215 | if (ret < 0) { | |
216 | error_stream << "http operation failed: " << op->to_str() | |
217 | << " status=" << op->get_http_status() << std::endl; | |
218 | lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret | |
219 | << ": " << op->to_str() << dendl; | |
220 | op->put(); | |
221 | return ret; | |
222 | } | |
223 | op->put(); | |
224 | return 0; | |
225 | } | |
226 | ||
227 | void request_cleanup() override { | |
228 | if (http_op) { | |
229 | http_op->put(); | |
230 | http_op = NULL; | |
231 | } | |
232 | } | |
233 | }; | |
234 | ||
11fdf7f2 TL |
235 | template <class S, class T, class E = int> |
236 | class RGWSendRESTResourceCR : public RGWSendRawRESTResourceCR<T, E> { | |
237 | public: | |
238 | RGWSendRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
239 | RGWHTTPManager *_http_manager, | |
240 | const string& _method, const string& _path, | |
241 | rgw_http_param_pair *_params, map<string, string> *_attrs, | |
242 | S& _input, T *_result, E *_err_result = nullptr) | |
243 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, _method, _path, _params, _attrs, _result, _err_result) { | |
244 | ||
245 | JSONFormatter jf; | |
246 | encode_json("data", _input, &jf); | |
247 | std::stringstream ss; | |
248 | jf.flush(ss); | |
249 | //bufferlist bl; | |
250 | this->input_bl.append(ss.str()); | |
251 | } | |
252 | ||
253 | }; | |
254 | ||
a8e16298 TL |
255 | template <class S, class T, class E = int> |
256 | class RGWPostRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> { | |
7c673cae FG |
257 | public: |
258 | RGWPostRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
259 | RGWHTTPManager *_http_manager, | |
260 | const string& _path, | |
a8e16298 TL |
261 | rgw_http_param_pair *_params, S& _input, |
262 | T *_result, E *_err_result = nullptr) | |
263 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
7c673cae | 264 | "POST", _path, |
11fdf7f2 TL |
265 | _params, nullptr, _input, |
266 | _result, _err_result) {} | |
267 | }; | |
268 | ||
269 | template <class T, class E = int> | |
270 | class RGWPutRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> { | |
271 | public: | |
272 | RGWPutRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
273 | RGWHTTPManager *_http_manager, | |
274 | const string& _path, | |
275 | rgw_http_param_pair *_params, bufferlist& _input, | |
276 | T *_result, E *_err_result = nullptr) | |
277 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "PUT", _path, | |
278 | _params, nullptr, _input, _result, true, _err_result) {} | |
279 | ||
7c673cae FG |
280 | }; |
281 | ||
11fdf7f2 TL |
282 | template <class T, class E = int> |
283 | class RGWPostRawRESTResourceCR: public RGWSendRawRESTResourceCR <T, E> { | |
284 | public: | |
285 | RGWPostRawRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
286 | RGWHTTPManager *_http_manager, | |
287 | const string& _path, | |
288 | rgw_http_param_pair *_params, | |
289 | map<string, string> * _attrs, | |
290 | bufferlist& _input, | |
291 | T *_result, E *_err_result = nullptr) | |
292 | : RGWSendRawRESTResourceCR<T, E>(_cct, _conn, _http_manager, "POST", _path, | |
293 | _params, _attrs, _input, _result, true, _err_result) {} | |
294 | ||
295 | }; | |
296 | ||
297 | ||
a8e16298 TL |
298 | template <class S, class T, class E = int> |
299 | class RGWPutRESTResourceCR : public RGWSendRESTResourceCR<S, T, E> { | |
7c673cae FG |
300 | public: |
301 | RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
302 | RGWHTTPManager *_http_manager, | |
303 | const string& _path, | |
a8e16298 TL |
304 | rgw_http_param_pair *_params, S& _input, |
305 | T *_result, E *_err_result = nullptr) | |
306 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
307 | "PUT", _path, | |
308 | _params, nullptr, _input, | |
309 | _result, _err_result) {} | |
310 | ||
311 | RGWPutRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
312 | RGWHTTPManager *_http_manager, | |
313 | const string& _path, | |
314 | rgw_http_param_pair *_params, | |
315 | map <string, string> *_attrs, | |
316 | S& _input, T *_result, E *_err_result = nullptr) | |
317 | : RGWSendRESTResourceCR<S, T, E>(_cct, _conn, _http_manager, | |
318 | "PUT", _path, | |
319 | _params, _attrs, _input, | |
320 | _result, _err_result) {} | |
11fdf7f2 | 321 | |
7c673cae FG |
322 | }; |
323 | ||
324 | class RGWDeleteRESTResourceCR : public RGWSimpleCoroutine { | |
325 | RGWRESTConn *conn; | |
326 | RGWHTTPManager *http_manager; | |
327 | string path; | |
328 | param_vec_t params; | |
329 | ||
330 | boost::intrusive_ptr<RGWRESTDeleteResource> http_op; | |
331 | ||
332 | public: | |
333 | RGWDeleteRESTResourceCR(CephContext *_cct, RGWRESTConn *_conn, | |
334 | RGWHTTPManager *_http_manager, | |
335 | const string& _path, | |
336 | rgw_http_param_pair *_params) | |
337 | : RGWSimpleCoroutine(_cct), conn(_conn), http_manager(_http_manager), | |
338 | path(_path), params(make_param_list(_params)) | |
339 | {} | |
340 | ||
341 | ~RGWDeleteRESTResourceCR() override { | |
342 | request_cleanup(); | |
343 | } | |
344 | ||
345 | int send_request() override { | |
346 | auto op = boost::intrusive_ptr<RGWRESTDeleteResource>( | |
347 | new RGWRESTDeleteResource(conn, path, params, nullptr, http_manager)); | |
348 | ||
11fdf7f2 | 349 | init_new_io(op.get()); |
7c673cae FG |
350 | |
351 | bufferlist bl; | |
352 | ||
353 | int ret = op->aio_send(bl); | |
354 | if (ret < 0) { | |
355 | lsubdout(cct, rgw, 0) << "ERROR: failed to send DELETE request" << dendl; | |
356 | op->put(); | |
357 | return ret; | |
358 | } | |
359 | std::swap(http_op, op); // store reference in http_op on success | |
360 | return 0; | |
361 | } | |
362 | ||
363 | int request_complete() override { | |
364 | int ret; | |
365 | bufferlist bl; | |
11fdf7f2 | 366 | ret = http_op->wait(&bl); |
7c673cae FG |
367 | auto op = std::move(http_op); // release ref on return |
368 | if (ret < 0) { | |
369 | error_stream << "http operation failed: " << op->to_str() | |
370 | << " status=" << op->get_http_status() << std::endl; | |
371 | lsubdout(cct, rgw, 5) << "failed to wait for op, ret=" << ret | |
372 | << ": " << op->to_str() << dendl; | |
373 | op->put(); | |
374 | return ret; | |
375 | } | |
376 | op->put(); | |
377 | return 0; | |
378 | } | |
379 | ||
380 | void request_cleanup() override { | |
381 | if (http_op) { | |
382 | http_op->put(); | |
383 | http_op = NULL; | |
384 | } | |
385 | } | |
386 | }; | |
387 | ||
11fdf7f2 TL |
388 | class RGWCRHTTPGetDataCB : public RGWHTTPStreamRWRequest::ReceiveCB { |
389 | Mutex lock; | |
390 | RGWCoroutinesEnv *env; | |
391 | RGWCoroutine *cr; | |
392 | RGWHTTPStreamRWRequest *req; | |
393 | rgw_io_id io_id; | |
394 | bufferlist data; | |
395 | bufferlist extra_data; | |
396 | bool got_all_extra_data{false}; | |
397 | bool paused{false}; | |
398 | bool notified{false}; | |
399 | public: | |
400 | RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req); | |
401 | ||
402 | int handle_data(bufferlist& bl, bool *pause) override; | |
403 | ||
404 | void claim_data(bufferlist *dest, uint64_t max); | |
405 | ||
406 | bufferlist& get_extra_data() { | |
407 | return extra_data; | |
408 | } | |
409 | ||
410 | bool has_data() { | |
411 | return (data.length() > 0); | |
412 | } | |
413 | ||
414 | bool has_all_extra_data() { | |
415 | return got_all_extra_data; | |
416 | } | |
417 | }; | |
418 | ||
419 | ||
420 | class RGWStreamReadResourceCRF { | |
421 | protected: | |
422 | boost::asio::coroutine read_state; | |
423 | ||
424 | public: | |
425 | virtual int init() = 0; | |
426 | virtual int read(bufferlist *data, uint64_t max, bool *need_retry) = 0; /* reentrant */ | |
427 | virtual int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) = 0; | |
428 | virtual bool has_attrs() = 0; | |
429 | virtual void get_attrs(std::map<string, string> *attrs) = 0; | |
430 | virtual ~RGWStreamReadResourceCRF() = default; | |
431 | }; | |
432 | ||
433 | class RGWStreamWriteResourceCRF { | |
434 | protected: | |
435 | boost::asio::coroutine write_state; | |
436 | boost::asio::coroutine drain_state; | |
437 | ||
438 | public: | |
439 | virtual int init() = 0; | |
440 | virtual void send_ready(const rgw_rest_obj& rest_obj) = 0; | |
441 | virtual int send() = 0; | |
442 | virtual int write(bufferlist& data, bool *need_retry) = 0; /* reentrant */ | |
443 | virtual int drain_writes(bool *need_retry) = 0; /* reentrant */ | |
444 | ||
445 | virtual ~RGWStreamWriteResourceCRF() = default; | |
446 | }; | |
447 | ||
448 | class RGWStreamReadHTTPResourceCRF : public RGWStreamReadResourceCRF { | |
449 | CephContext *cct; | |
450 | RGWCoroutinesEnv *env; | |
451 | RGWCoroutine *caller; | |
452 | RGWHTTPManager *http_manager; | |
453 | ||
454 | RGWHTTPStreamRWRequest *req{nullptr}; | |
455 | ||
456 | std::optional<RGWCRHTTPGetDataCB> in_cb; | |
457 | ||
458 | bufferlist extra_data; | |
459 | ||
460 | bool got_attrs{false}; | |
461 | bool got_extra_data{false}; | |
462 | ||
463 | rgw_io_id io_read_mask; | |
464 | ||
465 | protected: | |
466 | rgw_rest_obj rest_obj; | |
467 | ||
468 | struct range_info { | |
469 | bool is_set{false}; | |
470 | uint64_t ofs; | |
471 | uint64_t size; | |
472 | } range; | |
473 | ||
474 | ceph::real_time mtime; | |
475 | string etag; | |
476 | ||
477 | public: | |
478 | RGWStreamReadHTTPResourceCRF(CephContext *_cct, | |
479 | RGWCoroutinesEnv *_env, | |
480 | RGWCoroutine *_caller, | |
481 | RGWHTTPManager *_http_manager, | |
482 | const rgw_obj_key& _src_key) : cct(_cct), | |
483 | env(_env), | |
484 | caller(_caller), | |
485 | http_manager(_http_manager) { | |
486 | rest_obj.init(_src_key); | |
487 | } | |
488 | ~RGWStreamReadHTTPResourceCRF(); | |
489 | ||
490 | int init() override; | |
491 | int read(bufferlist *data, uint64_t max, bool *need_retry) override; /* reentrant */ | |
492 | int decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) override; | |
493 | bool has_attrs() override; | |
494 | void get_attrs(std::map<string, string> *attrs) override; | |
495 | bool is_done(); | |
496 | virtual bool need_extra_data() { return false; } | |
497 | ||
498 | void set_req(RGWHTTPStreamRWRequest *r) { | |
499 | req = r; | |
500 | } | |
501 | ||
502 | rgw_rest_obj& get_rest_obj() { | |
503 | return rest_obj; | |
504 | } | |
505 | ||
506 | void set_range(uint64_t ofs, uint64_t size) { | |
507 | range.is_set = true; | |
508 | range.ofs = ofs; | |
509 | range.size = size; | |
510 | } | |
511 | }; | |
512 | ||
513 | class RGWStreamWriteHTTPResourceCRF : public RGWStreamWriteResourceCRF { | |
514 | protected: | |
515 | RGWCoroutinesEnv *env; | |
516 | RGWCoroutine *caller; | |
517 | RGWHTTPManager *http_manager; | |
518 | ||
519 | using lock_guard = std::lock_guard<std::mutex>; | |
520 | ||
521 | std::mutex blocked_lock; | |
522 | bool is_blocked; | |
523 | ||
524 | RGWHTTPStreamRWRequest *req{nullptr}; | |
525 | ||
526 | struct multipart_info { | |
527 | bool is_multipart{false}; | |
528 | string upload_id; | |
529 | int part_num{0}; | |
530 | uint64_t part_size; | |
531 | } multipart; | |
532 | ||
533 | class WriteDrainNotify : public RGWWriteDrainCB { | |
534 | RGWStreamWriteHTTPResourceCRF *crf; | |
535 | public: | |
536 | explicit WriteDrainNotify(RGWStreamWriteHTTPResourceCRF *_crf) : crf(_crf) {} | |
537 | void notify(uint64_t pending_size) override; | |
538 | } write_drain_notify_cb; | |
539 | ||
540 | public: | |
541 | RGWStreamWriteHTTPResourceCRF(CephContext *_cct, | |
542 | RGWCoroutinesEnv *_env, | |
543 | RGWCoroutine *_caller, | |
544 | RGWHTTPManager *_http_manager) : env(_env), | |
545 | caller(_caller), | |
546 | http_manager(_http_manager), | |
547 | write_drain_notify_cb(this) {} | |
548 | virtual ~RGWStreamWriteHTTPResourceCRF(); | |
549 | ||
550 | int init() override { | |
551 | return 0; | |
552 | } | |
553 | void send_ready(const rgw_rest_obj& rest_obj) override; | |
554 | int send() override; | |
555 | int write(bufferlist& data, bool *need_retry) override; /* reentrant */ | |
556 | void write_drain_notify(uint64_t pending_size); | |
557 | int drain_writes(bool *need_retry) override; /* reentrant */ | |
558 | ||
559 | virtual void handle_headers(const std::map<string, string>& headers) {} | |
560 | ||
561 | void set_req(RGWHTTPStreamRWRequest *r) { | |
562 | req = r; | |
563 | } | |
564 | ||
565 | void set_multipart(const string& upload_id, int part_num, uint64_t part_size) { | |
566 | multipart.is_multipart = true; | |
567 | multipart.upload_id = upload_id; | |
568 | multipart.part_num = part_num; | |
569 | multipart.part_size = part_size; | |
570 | } | |
571 | }; | |
572 | ||
573 | class RGWStreamSpliceCR : public RGWCoroutine { | |
574 | CephContext *cct; | |
575 | RGWHTTPManager *http_manager; | |
576 | string url; | |
577 | std::shared_ptr<RGWStreamReadHTTPResourceCRF> in_crf; | |
578 | std::shared_ptr<RGWStreamWriteHTTPResourceCRF> out_crf; | |
579 | bufferlist bl; | |
580 | bool need_retry{false}; | |
581 | bool sent_attrs{false}; | |
582 | uint64_t total_read{0}; | |
583 | int ret{0}; | |
584 | public: | |
585 | RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, | |
586 | std::shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf, | |
587 | std::shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf); | |
588 | ~RGWStreamSpliceCR(); | |
589 | ||
590 | int operate() override; | |
591 | }; | |
592 | ||
7c673cae | 593 | #endif |