1 // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
2 // vim: ts=8 sw=2 smarttab ft=cpp
4 #include "rgw_cr_rest.h"
6 #include "rgw_coroutine.h"
8 // re-include our assert to clobber the system one; fix dout:
9 #include "include/ceph_assert.h"
11 #include <boost/asio/yield.hpp>
13 #define dout_context g_ceph_context
14 #define dout_subsys ceph_subsys_rgw
16 RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv
*_env
, RGWCoroutine
*_cr
, RGWHTTPStreamRWRequest
*_req
) : env(_env
), cr(_cr
), req(_req
) {
17 io_id
= req
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ
|RGWHTTPClient::HTTPCLIENT_IO_CONTROL
);
21 #define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024
23 int RGWCRHTTPGetDataCB::handle_data(bufferlist
& bl
, bool *pause
) {
24 if (data
.length() < GET_DATA_WINDOW_SIZE
/ 2) {
29 uint64_t bl_len
= bl
.length();
31 std::lock_guard l
{lock
};
33 if (!got_all_extra_data
) {
34 uint64_t max
= extra_data_len
- extra_data
.length();
38 bl
.splice(0, max
, &extra_data
);
40 got_all_extra_data
= extra_data
.length() == extra_data_len
;
46 uint64_t data_len
= data
.length();
47 if (data_len
>= GET_DATA_WINDOW_SIZE
&& !notified
) {
49 env
->manager
->io_complete(cr
, io_id
);
51 if (data_len
>= 2 * GET_DATA_WINDOW_SIZE
) {
58 void RGWCRHTTPGetDataCB::claim_data(bufferlist
*dest
, uint64_t max
) {
59 bool need_to_unpause
= false;
62 std::lock_guard l
{lock
};
64 if (data
.length() == 0) {
68 if (data
.length() < max
) {
72 data
.splice(0, max
, dest
);
73 need_to_unpause
= (paused
&& data
.length() <= GET_DATA_WINDOW_SIZE
);
76 if (need_to_unpause
) {
77 req
->unpause_receive();
81 RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF()
85 req
->wait(null_yield
);
90 int RGWStreamReadHTTPResourceCRF::init()
92 env
->stack
->init_new_io(req
);
94 in_cb
.emplace(env
, caller
, req
);
96 int r
= http_manager
->add_request(req
);
104 int RGWStreamWriteHTTPResourceCRF::send()
106 env
->stack
->init_new_io(req
);
108 req
->set_write_drain_cb(&write_drain_notify_cb
);
110 int r
= http_manager
->add_request(req
);
118 bool RGWStreamReadHTTPResourceCRF::has_attrs()
123 void RGWStreamReadHTTPResourceCRF::get_attrs(std::map
<string
, string
> *attrs
)
125 req
->get_out_headers(attrs
);
128 int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map
<string
, string
>& headers
, bufferlist
& extra_data
) {
129 /* basic generic implementation */
130 for (auto header
: headers
) {
131 const string
& val
= header
.second
;
133 rest_obj
.attrs
[header
.first
] = val
;
139 int RGWStreamReadHTTPResourceCRF::read(bufferlist
*out
, uint64_t max_size
, bool *io_pending
)
141 reenter(&read_state
) {
142 io_read_mask
= req
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ
| RGWHTTPClient::HTTPCLIENT_IO_CONTROL
);
143 while (!req
->is_done() ||
146 if (!in_cb
->has_data()) {
147 yield caller
->io_block(0, io_read_mask
);
150 if (need_extra_data() && !got_extra_data
) {
151 if (!in_cb
->has_all_extra_data()) {
154 extra_data
.claim_append(in_cb
->get_extra_data());
155 map
<string
, string
> attrs
;
156 req
->get_out_headers(&attrs
);
157 int ret
= decode_rest_obj(attrs
, extra_data
);
159 ldout(cct
, 0) << "ERROR: " << __func__
<< " decode_rest_obj() returned ret=" << ret
<< dendl
;
162 got_extra_data
= true;
165 in_cb
->claim_data(out
, max_size
);
166 if (out
->length() == 0) {
167 /* this may happen if we just read the prepended extra_data and didn't have any data
168 * after. In that case, retry reading, so that caller doesn't assume it's EOF.
172 if (!req
->is_done() || out
->length() >= max_size
) {
180 bool RGWStreamReadHTTPResourceCRF::is_done()
182 return req
->is_done();
185 RGWStreamWriteHTTPResourceCRF::~RGWStreamWriteHTTPResourceCRF()
189 req
->wait(null_yield
);
194 void RGWStreamWriteHTTPResourceCRF::send_ready(const rgw_rest_obj
& rest_obj
)
196 req
->set_send_length(rest_obj
.content_len
);
197 for (auto h
: rest_obj
.attrs
) {
198 req
->append_header(h
.first
, h
.second
);
202 #define PENDING_WRITES_WINDOW (1 * 1024 * 1024)
204 void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size
)
206 lock_guard
l(blocked_lock
);
207 if (is_blocked
&& (pending_size
< PENDING_WRITES_WINDOW
/ 2)) {
208 env
->manager
->io_complete(caller
, req
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE
| RGWHTTPClient::HTTPCLIENT_IO_CONTROL
));
213 void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size
)
215 crf
->write_drain_notify(pending_size
);
218 int RGWStreamWriteHTTPResourceCRF::write(bufferlist
& data
, bool *io_pending
)
220 reenter(&write_state
) {
221 while (!req
->is_done()) {
223 if (req
->get_pending_send_size() >= PENDING_WRITES_WINDOW
) {
226 lock_guard
l(blocked_lock
);
229 /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
232 yield caller
->io_block(0, req
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE
| RGWHTTPClient::HTTPCLIENT_IO_CONTROL
));
234 yield req
->add_send_data(data
);
236 return req
->get_status();
241 int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry
)
243 reenter(&drain_state
) {
245 yield req
->finish_write();
246 *need_retry
= !req
->is_done();
247 while (!req
->is_done()) {
248 yield caller
->io_block(0, req
->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL
));
249 *need_retry
= !req
->is_done();
252 map
<string
, string
> headers
;
253 req
->get_out_headers(&headers
);
254 handle_headers(headers
);
256 return req
->get_req_retcode();
261 RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext
*_cct
, RGWHTTPManager
*_mgr
,
262 shared_ptr
<RGWStreamReadHTTPResourceCRF
>& _in_crf
,
263 shared_ptr
<RGWStreamWriteHTTPResourceCRF
>& _out_crf
) : RGWCoroutine(_cct
), cct(_cct
), http_manager(_mgr
),
264 in_crf(_in_crf
), out_crf(_out_crf
) {}
265 RGWStreamSpliceCR::~RGWStreamSpliceCR() { }
267 int RGWStreamSpliceCR::operate() {
270 int ret
= in_crf
->init();
272 return set_cr_error(ret
);
282 ret
= in_crf
->read(&bl
, 4 * 1024 * 1024, &need_retry
);
284 return set_cr_error(ret
);
289 ldout(cct
, 20) << __func__
<< ": in_crf->read() retcode=" << retcode
<< dendl
;
290 return set_cr_error(ret
);
292 } while (need_retry
);
294 ldout(cct
, 20) << "read " << bl
.length() << " bytes" << dendl
;
296 if (!in_crf
->has_attrs()) {
297 assert (bl
.length() == 0);
302 int ret
= out_crf
->init();
304 return set_cr_error(ret
);
306 out_crf
->send_ready(in_crf
->get_rest_obj());
307 ret
= out_crf
->send();
309 return set_cr_error(ret
);
314 if (bl
.length() == 0 && in_crf
->is_done()) {
318 total_read
+= bl
.length();
322 ldout(cct
, 20) << "writing " << bl
.length() << " bytes" << dendl
;
323 ret
= out_crf
->write(bl
, &need_retry
);
325 return set_cr_error(ret
);
330 ldout(cct
, 20) << __func__
<< ": out_crf->write() retcode=" << retcode
<< dendl
;
331 return set_cr_error(ret
);
333 } while (need_retry
);
338 int ret
= out_crf
->drain_writes(&need_retry
);
340 return set_cr_error(ret
);
343 } while (need_retry
);
345 return set_cr_done();