]>
Commit | Line | Data |
---|---|---|
11fdf7f2 | 1 | // -*- mode:C++; tab-width:8; c-basic-offset:2; indent-tabs-mode:t -*- |
9f95a23c | 2 | // vim: ts=8 sw=2 smarttab ft=cpp |
11fdf7f2 TL |
3 | |
4 | #include "rgw_cr_rest.h" | |
5 | ||
6 | #include "rgw_coroutine.h" | |
7 | ||
8 | // re-include our assert to clobber the system one; fix dout: | |
9 | #include "include/ceph_assert.h" | |
10 | ||
11 | #include <boost/asio/yield.hpp> | |
12 | ||
13 | #define dout_context g_ceph_context | |
14 | #define dout_subsys ceph_subsys_rgw | |
15 | ||
9f95a23c | 16 | RGWCRHTTPGetDataCB::RGWCRHTTPGetDataCB(RGWCoroutinesEnv *_env, RGWCoroutine *_cr, RGWHTTPStreamRWRequest *_req) : env(_env), cr(_cr), req(_req) { |
11fdf7f2 TL |
17 | io_id = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ |RGWHTTPClient::HTTPCLIENT_IO_CONTROL); |
18 | req->set_in_cb(this); | |
19 | } | |
20 | ||
21 | #define GET_DATA_WINDOW_SIZE 2 * 1024 * 1024 | |
22 | ||
23 | int RGWCRHTTPGetDataCB::handle_data(bufferlist& bl, bool *pause) { | |
24 | if (data.length() < GET_DATA_WINDOW_SIZE / 2) { | |
25 | notified = false; | |
26 | } | |
27 | ||
28 | { | |
29 | uint64_t bl_len = bl.length(); | |
30 | ||
9f95a23c | 31 | std::lock_guard l{lock}; |
11fdf7f2 TL |
32 | |
33 | if (!got_all_extra_data) { | |
34 | uint64_t max = extra_data_len - extra_data.length(); | |
35 | if (max > bl_len) { | |
36 | max = bl_len; | |
37 | } | |
38 | bl.splice(0, max, &extra_data); | |
39 | bl_len -= max; | |
40 | got_all_extra_data = extra_data.length() == extra_data_len; | |
41 | } | |
42 | ||
43 | data.append(bl); | |
44 | } | |
45 | ||
46 | uint64_t data_len = data.length(); | |
47 | if (data_len >= GET_DATA_WINDOW_SIZE && !notified) { | |
48 | notified = true; | |
49 | env->manager->io_complete(cr, io_id); | |
50 | } | |
51 | if (data_len >= 2 * GET_DATA_WINDOW_SIZE) { | |
52 | *pause = true; | |
53 | paused = true; | |
54 | } | |
55 | return 0; | |
56 | } | |
57 | ||
58 | void RGWCRHTTPGetDataCB::claim_data(bufferlist *dest, uint64_t max) { | |
59 | bool need_to_unpause = false; | |
60 | ||
61 | { | |
9f95a23c | 62 | std::lock_guard l{lock}; |
11fdf7f2 TL |
63 | |
64 | if (data.length() == 0) { | |
65 | return; | |
66 | } | |
67 | ||
68 | if (data.length() < max) { | |
69 | max = data.length(); | |
70 | } | |
71 | ||
72 | data.splice(0, max, dest); | |
73 | need_to_unpause = (paused && data.length() <= GET_DATA_WINDOW_SIZE); | |
74 | } | |
75 | ||
76 | if (need_to_unpause) { | |
77 | req->unpause_receive(); | |
78 | } | |
79 | } | |
80 | ||
81 | RGWStreamReadHTTPResourceCRF::~RGWStreamReadHTTPResourceCRF() | |
82 | { | |
83 | if (req) { | |
84 | req->cancel(); | |
9f95a23c | 85 | req->wait(null_yield); |
11fdf7f2 TL |
86 | delete req; |
87 | } | |
88 | } | |
89 | ||
b3b6e05e | 90 | int RGWStreamReadHTTPResourceCRF::init(const DoutPrefixProvider *dpp) |
11fdf7f2 TL |
91 | { |
92 | env->stack->init_new_io(req); | |
93 | ||
94 | in_cb.emplace(env, caller, req); | |
95 | ||
96 | int r = http_manager->add_request(req); | |
97 | if (r < 0) { | |
98 | return r; | |
99 | } | |
100 | ||
101 | return 0; | |
102 | } | |
103 | ||
104 | int RGWStreamWriteHTTPResourceCRF::send() | |
105 | { | |
106 | env->stack->init_new_io(req); | |
107 | ||
108 | req->set_write_drain_cb(&write_drain_notify_cb); | |
109 | ||
110 | int r = http_manager->add_request(req); | |
111 | if (r < 0) { | |
112 | return r; | |
113 | } | |
114 | ||
115 | return 0; | |
116 | } | |
117 | ||
118 | bool RGWStreamReadHTTPResourceCRF::has_attrs() | |
119 | { | |
120 | return got_attrs; | |
121 | } | |
122 | ||
123 | void RGWStreamReadHTTPResourceCRF::get_attrs(std::map<string, string> *attrs) | |
124 | { | |
125 | req->get_out_headers(attrs); | |
126 | } | |
127 | ||
128 | int RGWStreamReadHTTPResourceCRF::decode_rest_obj(map<string, string>& headers, bufferlist& extra_data) { | |
129 | /* basic generic implementation */ | |
130 | for (auto header : headers) { | |
131 | const string& val = header.second; | |
132 | ||
133 | rest_obj.attrs[header.first] = val; | |
134 | } | |
135 | ||
136 | return 0; | |
137 | } | |
138 | ||
139 | int RGWStreamReadHTTPResourceCRF::read(bufferlist *out, uint64_t max_size, bool *io_pending) | |
140 | { | |
141 | reenter(&read_state) { | |
142 | io_read_mask = req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_READ | RGWHTTPClient::HTTPCLIENT_IO_CONTROL); | |
143 | while (!req->is_done() || | |
144 | in_cb->has_data()) { | |
145 | *io_pending = true; | |
146 | if (!in_cb->has_data()) { | |
147 | yield caller->io_block(0, io_read_mask); | |
148 | } | |
149 | got_attrs = true; | |
150 | if (need_extra_data() && !got_extra_data) { | |
151 | if (!in_cb->has_all_extra_data()) { | |
152 | continue; | |
153 | } | |
154 | extra_data.claim_append(in_cb->get_extra_data()); | |
155 | map<string, string> attrs; | |
156 | req->get_out_headers(&attrs); | |
157 | int ret = decode_rest_obj(attrs, extra_data); | |
158 | if (ret < 0) { | |
159 | ldout(cct, 0) << "ERROR: " << __func__ << " decode_rest_obj() returned ret=" << ret << dendl; | |
160 | return ret; | |
161 | } | |
162 | got_extra_data = true; | |
163 | } | |
164 | *io_pending = false; | |
165 | in_cb->claim_data(out, max_size); | |
166 | if (out->length() == 0) { | |
167 | /* this may happen if we just read the prepended extra_data and didn't have any data | |
168 | * after. In that case, retry reading, so that caller doesn't assume it's EOF. | |
169 | */ | |
170 | continue; | |
171 | } | |
172 | if (!req->is_done() || out->length() >= max_size) { | |
173 | yield; | |
174 | } | |
175 | } | |
176 | } | |
177 | return 0; | |
178 | } | |
179 | ||
180 | bool RGWStreamReadHTTPResourceCRF::is_done() | |
181 | { | |
182 | return req->is_done(); | |
183 | } | |
184 | ||
185 | RGWStreamWriteHTTPResourceCRF::~RGWStreamWriteHTTPResourceCRF() | |
186 | { | |
187 | if (req) { | |
188 | req->cancel(); | |
9f95a23c | 189 | req->wait(null_yield); |
11fdf7f2 TL |
190 | delete req; |
191 | } | |
192 | } | |
193 | ||
b3b6e05e | 194 | void RGWStreamWriteHTTPResourceCRF::send_ready(const DoutPrefixProvider *dpp, const rgw_rest_obj& rest_obj) |
11fdf7f2 TL |
195 | { |
196 | req->set_send_length(rest_obj.content_len); | |
197 | for (auto h : rest_obj.attrs) { | |
198 | req->append_header(h.first, h.second); | |
199 | } | |
200 | } | |
201 | ||
202 | #define PENDING_WRITES_WINDOW (1 * 1024 * 1024) | |
203 | ||
204 | void RGWStreamWriteHTTPResourceCRF::write_drain_notify(uint64_t pending_size) | |
205 | { | |
206 | lock_guard l(blocked_lock); | |
207 | if (is_blocked && (pending_size < PENDING_WRITES_WINDOW / 2)) { | |
208 | env->manager->io_complete(caller, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); | |
209 | is_blocked = false; | |
210 | } | |
211 | } | |
212 | ||
213 | void RGWStreamWriteHTTPResourceCRF::WriteDrainNotify::notify(uint64_t pending_size) | |
214 | { | |
215 | crf->write_drain_notify(pending_size); | |
216 | } | |
217 | ||
218 | int RGWStreamWriteHTTPResourceCRF::write(bufferlist& data, bool *io_pending) | |
219 | { | |
220 | reenter(&write_state) { | |
221 | while (!req->is_done()) { | |
222 | *io_pending = false; | |
223 | if (req->get_pending_send_size() >= PENDING_WRITES_WINDOW) { | |
224 | *io_pending = true; | |
225 | { | |
226 | lock_guard l(blocked_lock); | |
227 | is_blocked = true; | |
228 | ||
229 | /* it's ok to unlock here, even if io_complete() arrives before io_block(), it'll wakeup | |
230 | * correctly */ | |
231 | } | |
232 | yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_WRITE | RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); | |
233 | } | |
234 | yield req->add_send_data(data); | |
235 | } | |
236 | return req->get_status(); | |
237 | } | |
238 | return 0; | |
239 | } | |
240 | ||
241 | int RGWStreamWriteHTTPResourceCRF::drain_writes(bool *need_retry) | |
242 | { | |
243 | reenter(&drain_state) { | |
244 | *need_retry = true; | |
245 | yield req->finish_write(); | |
246 | *need_retry = !req->is_done(); | |
247 | while (!req->is_done()) { | |
248 | yield caller->io_block(0, req->get_io_id(RGWHTTPClient::HTTPCLIENT_IO_CONTROL)); | |
249 | *need_retry = !req->is_done(); | |
250 | } | |
251 | ||
252 | map<string, string> headers; | |
253 | req->get_out_headers(&headers); | |
254 | handle_headers(headers); | |
255 | ||
256 | return req->get_req_retcode(); | |
257 | } | |
258 | return 0; | |
259 | } | |
260 | ||
261 | RGWStreamSpliceCR::RGWStreamSpliceCR(CephContext *_cct, RGWHTTPManager *_mgr, | |
262 | shared_ptr<RGWStreamReadHTTPResourceCRF>& _in_crf, | |
263 | shared_ptr<RGWStreamWriteHTTPResourceCRF>& _out_crf) : RGWCoroutine(_cct), cct(_cct), http_manager(_mgr), | |
264 | in_crf(_in_crf), out_crf(_out_crf) {} | |
265 | RGWStreamSpliceCR::~RGWStreamSpliceCR() { } | |
266 | ||
b3b6e05e | 267 | int RGWStreamSpliceCR::operate(const DoutPrefixProvider *dpp) { |
11fdf7f2 TL |
268 | reenter(this) { |
269 | { | |
b3b6e05e | 270 | int ret = in_crf->init(dpp); |
11fdf7f2 TL |
271 | if (ret < 0) { |
272 | return set_cr_error(ret); | |
273 | } | |
274 | } | |
275 | ||
276 | do { | |
277 | ||
278 | bl.clear(); | |
279 | ||
280 | do { | |
281 | yield { | |
282 | ret = in_crf->read(&bl, 4 * 1024 * 1024, &need_retry); | |
283 | if (ret < 0) { | |
284 | return set_cr_error(ret); | |
285 | } | |
286 | } | |
287 | ||
288 | if (retcode < 0) { | |
289 | ldout(cct, 20) << __func__ << ": in_crf->read() retcode=" << retcode << dendl; | |
290 | return set_cr_error(ret); | |
291 | } | |
292 | } while (need_retry); | |
293 | ||
294 | ldout(cct, 20) << "read " << bl.length() << " bytes" << dendl; | |
295 | ||
296 | if (!in_crf->has_attrs()) { | |
297 | assert (bl.length() == 0); | |
298 | continue; | |
299 | } | |
300 | ||
301 | if (!sent_attrs) { | |
302 | int ret = out_crf->init(); | |
303 | if (ret < 0) { | |
304 | return set_cr_error(ret); | |
305 | } | |
b3b6e05e | 306 | out_crf->send_ready(dpp, in_crf->get_rest_obj()); |
11fdf7f2 TL |
307 | ret = out_crf->send(); |
308 | if (ret < 0) { | |
309 | return set_cr_error(ret); | |
310 | } | |
311 | sent_attrs = true; | |
312 | } | |
313 | ||
314 | if (bl.length() == 0 && in_crf->is_done()) { | |
315 | break; | |
316 | } | |
317 | ||
318 | total_read += bl.length(); | |
319 | ||
320 | do { | |
321 | yield { | |
322 | ldout(cct, 20) << "writing " << bl.length() << " bytes" << dendl; | |
323 | ret = out_crf->write(bl, &need_retry); | |
324 | if (ret < 0) { | |
325 | return set_cr_error(ret); | |
326 | } | |
327 | } | |
328 | ||
329 | if (retcode < 0) { | |
330 | ldout(cct, 20) << __func__ << ": out_crf->write() retcode=" << retcode << dendl; | |
331 | return set_cr_error(ret); | |
332 | } | |
333 | } while (need_retry); | |
334 | } while (true); | |
335 | ||
336 | do { | |
337 | yield { | |
338 | int ret = out_crf->drain_writes(&need_retry); | |
339 | if (ret < 0) { | |
340 | return set_cr_error(ret); | |
341 | } | |
342 | } | |
343 | } while (need_retry); | |
344 | ||
345 | return set_cr_done(); | |
346 | } | |
347 | return 0; | |
348 | } | |
349 |