]> git.proxmox.com Git - ceph.git/blame - ceph/src/rgw/rgw_cr_rest.cc
import ceph pacific 16.2.5
[ceph.git] / ceph / src / rgw / rgw_cr_rest.cc
CommitLineData
11fdf7f2 1// -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*-
9f95a23c 2// vim: ts=8 sw=2 smarttab ft=cpp
11fdf7f2
TL
3
4#include "rgw_cr_rest.h"
5
6#include "rgw_coroutine.h"
7
8// re-include our assert to clobber the system one; fix dout:
9#include "include/ceph_assert.h"
10
11#include <boost/asio/yield.hpp>
12
13#define dout_context g_ceph_context
14#define dout_subsys ceph_subsys_rgw
15
9f95a23c 16RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : env(_env), cr(_cr), req(_req) {
11fdf7f2
TL
17 io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
18 req->set_in_cb(this);
19}
20
21#define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024
22
23int RGWCRHTTPGetDataCB::handle_data(bufferlist& bl, bool *pause) {
24 if (data.length() < GET_DATA_WINDOW_SIZE / 2) {
25 notified = false;
26 }
27
28 {
29 uint64_t bl_len = bl.length();
30
9f95a23c 31 std::lock_guard l{lock};
11fdf7f2
TL
32
33 if (!got_all_extra_data) {
34 uint64_t max = extra_data_len - extra_data.length();
35 if (max > bl_len) {
36 max = bl_len;
37 }
38 bl.splice(0, max, &extra_data);
39 bl_len -= max;
40 got_all_extra_data = extra_data.length() == extra_data_len;
41 }
42
43 data.append(bl);
44 }
45
46 uint64_t data_len = data.length();
47 if (data_len >= GET_DATA_WINDOW_SIZE && !notified) {
48 notified = true;
49 env->manager->io_complete(cr, io_id);
50 }
51 if (data_len >= 2 * GET_DATA_WINDOW_SIZE) {
52 *pause = true;
53 paused = true;
54 }
55 return 0;
56}
57
58void RGWCRHTTPGetDataCB::claim_data(bufferlist *dest, uint64_t max) {
59 bool need_to_unpause = false;
60
61 {
9f95a23c 62 std::lock_guard l{lock};
11fdf7f2
TL
63
64 if (data.length() == 0) {
65 return;
66 }
67
68 if (data.length() < max) {
69 max = data.length();
70 }
71
72 data.splice(0, max, dest);
73 need_to_unpause = (paused && data.length() <= GET_DATA_WINDOW_SIZE);
74 }
75
76 if (need_to_unpause) {
77 req->unpause_receive();
78 }
79}
80
81RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF()
82{
83 if (req) {
84 req->cancel();
9f95a23c 85 req->wait(null_yield);
11fdf7f2
TL
86 delete req;
87 }
88}
89
b3b6e05e 90int RGWStreamReadHTTPResourceCRF::init(const DoutPrefixProvider *dpp)
11fdf7f2
TL
91{
92 env->stack->init_new_io(req);
93
94 in_cb.emplace(env, caller, req);
95
96 int r = http_manager->add_request(req);
97 if (r < 0) {
98 return r;
99 }
100
101 return 0;
102}
103
104int RGWStreamWriteHTTPResourceCRF::send()
105{
106 env->stack->init_new_io(req);
107
108 req->set_write_drain_cb(&write_drain_notify_cb);
109
110 int r = http_manager->add_request(req);
111 if (r < 0) {
112 return r;
113 }
114
115 return 0;
116}
117
118bool RGWStreamReadHTTPResourceCRF::has_attrs()
119{
120 return got_attrs;
121}
122
123void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs)
124{
125 req->get_out_headers(attrs);
126}
127
128int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) {
129 /* basic generic implementation */
130 for (auto header : headers) {
131 const string& val = header.second;
132
133 rest_obj.attrs[header.first] = val;
134 }
135
136 return 0;
137}
138
139int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending)
140{
141 reenter(&read_state) {
142 io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL);
143 while (!req->is_done() ||
144 in_cb->has_data()) {
145 *io_pending = true;
146 if (!in_cb->has_data()) {
147 yield caller->io_block(0, io_read_mask);
148 }
149 got_attrs = true;
150 if (need_extra_data() && !got_extra_data) {
151 if (!in_cb->has_all_extra_data()) {
152 continue;
153 }
154 extra_data.claim_append(in_cb->get_extra_data());
155 map<string, string> attrs;
156 req->get_out_headers(&attrs);
157 int ret = decode_rest_obj(attrs, extra_data);
158 if (ret < 0) {
159 ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl;
160 return ret;
161 }
162 got_extra_data = true;
163 }
164 *io_pending = false;
165 in_cb->claim_data(out, max_size);
166 if (out->length() == 0) {
167 /* this may happen if we just read the prepended extra_data and didn't have any data
168 * after. In that case, retry reading, so that caller doesn't assume it's EOF.
169 */
170 continue;
171 }
172 if (!req->is_done() || out->length() >= max_size) {
173 yield;
174 }
175 }
176 }
177 return 0;
178}
179
180bool RGWStreamReadHTTPResourceCRF::is_done()
181{
182 return req->is_done();
183}
184
185RGWStreamWriteHTTPResourceCRF::~RGWStreamWriteHTTPResourceCRF()
186{
187 if (req) {
188 req->cancel();
9f95a23c 189 req->wait(null_yield);
11fdf7f2
TL
190 delete req;
191 }
192}
193
b3b6e05e 194void RGWStreamWriteHTTPResourceCRF::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj)
11fdf7f2
TL
195{
196 req->set_send_length(rest_obj.content_len);
197 for (auto h : rest_obj.attrs) {
198 req->append_header(h.first, h.second);
199 }
200}
201
202#define PENDING_WRITES_WINDOW (1 * 1024 * 1024)
203
204void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size)
205{
206 lock_guard l(blocked_lock);
207 if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) {
208 env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
209 is_blocked = false;
210 }
211}
212
213void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size)
214{
215 crf->write_drain_notify(pending_size);
216}
217
218int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending)
219{
220 reenter(&write_state) {
221 while (!req->is_done()) {
222 *io_pending = false;
223 if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) {
224 *io_pending = true;
225 {
226 lock_guard l(blocked_lock);
227 is_blocked = true;
228
229 /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup
230 * correctly */
231 }
232 yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
233 }
234 yield req->add_send_data(data);
235 }
236 return req->get_status();
237 }
238 return 0;
239}
240
241int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry)
242{
243 reenter(&drain_state) {
244 *need_retry = true;
245 yield req->finish_write();
246 *need_retry = !req->is_done();
247 while (!req->is_done()) {
248 yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL));
249 *need_retry = !req->is_done();
250 }
251
252 map<string, string> headers;
253 req->get_out_headers(&headers);
254 handle_headers(headers);
255
256 return req->get_req_retcode();
257 }
258 return 0;
259}
260
261RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr,
262 shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf,
263 shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr),
264 in_crf(_in_crf), out_crf(_out_crf) {}
265RGWStreamSpliceCR::~RGWStreamSpliceCR() { }
266
b3b6e05e 267int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) {
11fdf7f2
TL
268 reenter(this) {
269 {
b3b6e05e 270 int ret = in_crf->init(dpp);
11fdf7f2
TL
271 if (ret < 0) {
272 return set_cr_error(ret);
273 }
274 }
275
276 do {
277
278 bl.clear();
279
280 do {
281 yield {
282 ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry);
283 if (ret < 0) {
284 return set_cr_error(ret);
285 }
286 }
287
288 if (retcode < 0) {
289 ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl;
290 return set_cr_error(ret);
291 }
292 } while (need_retry);
293
294 ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl;
295
296 if (!in_crf->has_attrs()) {
297 assert (bl.length() == 0);
298 continue;
299 }
300
301 if (!sent_attrs) {
302 int ret = out_crf->init();
303 if (ret < 0) {
304 return set_cr_error(ret);
305 }
b3b6e05e 306 out_crf->send_ready(dpp, in_crf->get_rest_obj());
11fdf7f2
TL
307 ret = out_crf->send();
308 if (ret < 0) {
309 return set_cr_error(ret);
310 }
311 sent_attrs = true;
312 }
313
314 if (bl.length() == 0 && in_crf->is_done()) {
315 break;
316 }
317
318 total_read += bl.length();
319
320 do {
321 yield {
322 ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl;
323 ret = out_crf->write(bl, &need_retry);
324 if (ret < 0) {
325 return set_cr_error(ret);
326 }
327 }
328
329 if (retcode < 0) {
330 ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl;
331 return set_cr_error(ret);
332 }
333 } while (need_retry);
334 } while (true);
335
336 do {
337 yield {
338 int ret = out_crf->drain_writes(&need_retry);
339 if (ret < 0) {
340 return set_cr_error(ret);
341 }
342 }
343 } while (need_retry);
344
345 return set_cr_done();
346 }
347 return 0;
348}
349