]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | // |
2 | // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // Official repository: https://github.com/boostorg/beast | |
8 | // | |
9 | ||
10 | #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP | |
11 | #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP | |
12 | ||
13 | #include <boost/beast/core/buffer_traits.hpp> | |
14 | #include <boost/beast/websocket/teardown.hpp> | |
15 | #include <boost/beast/websocket/detail/mask.hpp> | |
16 | #include <boost/beast/websocket/impl/stream_impl.hpp> | |
17 | #include <boost/beast/core/async_base.hpp> | |
18 | #include <boost/beast/core/bind_handler.hpp> | |
19 | #include <boost/beast/core/buffers_prefix.hpp> | |
20 | #include <boost/beast/core/buffers_suffix.hpp> | |
21 | #include <boost/beast/core/flat_static_buffer.hpp> | |
22 | #include <boost/beast/core/read_size.hpp> | |
23 | #include <boost/beast/core/stream_traits.hpp> | |
24 | #include <boost/beast/core/detail/bind_continuation.hpp> | |
25 | #include <boost/beast/core/detail/buffer.hpp> | |
26 | #include <boost/beast/core/detail/clamp.hpp> | |
27 | #include <boost/beast/core/detail/config.hpp> | |
28 | #include <boost/asio/coroutine.hpp> | |
29 | #include <boost/asio/post.hpp> | |
30 | #include <boost/assert.hpp> | |
31 | #include <boost/config.hpp> | |
32 | #include <boost/optional.hpp> | |
33 | #include <boost/throw_exception.hpp> | |
34 | #include <algorithm> | |
35 | #include <limits> | |
36 | #include <memory> | |
37 | ||
38 | namespace boost { | |
39 | namespace beast { | |
40 | namespace websocket { | |
41 | ||
42 | /* Read some message data into a buffer sequence. | |
43 | ||
44 | Also reads and handles control frames. | |
45 | */ | |
46 | template<class NextLayer, bool deflateSupported> | |
47 | template<class Handler, class MutableBufferSequence> | |
48 | class stream<NextLayer, deflateSupported>::read_some_op | |
49 | : public beast::async_base< | |
50 | Handler, beast::executor_type<stream>> | |
51 | , public asio::coroutine | |
52 | { | |
53 | boost::weak_ptr<impl_type> wp_; | |
54 | MutableBufferSequence bs_; | |
55 | buffers_suffix<MutableBufferSequence> cb_; | |
56 | std::size_t bytes_written_ = 0; | |
57 | error_code result_; | |
58 | close_code code_; | |
59 | bool did_read_ = false; | |
60 | ||
61 | public: | |
62 | static constexpr int id = 1; // for soft_mutex | |
63 | ||
64 | template<class Handler_> | |
65 | read_some_op( | |
66 | Handler_&& h, | |
67 | boost::shared_ptr<impl_type> const& sp, | |
68 | MutableBufferSequence const& bs) | |
69 | : async_base< | |
70 | Handler, beast::executor_type<stream>>( | |
71 | std::forward<Handler_>(h), | |
72 | sp->stream().get_executor()) | |
73 | , wp_(sp) | |
74 | , bs_(bs) | |
75 | , cb_(bs) | |
76 | , code_(close_code::none) | |
77 | { | |
78 | (*this)({}, 0, false); | |
79 | } | |
80 | ||
81 | void operator()( | |
82 | error_code ec = {}, | |
83 | std::size_t bytes_transferred = 0, | |
84 | bool cont = true) | |
85 | { | |
86 | using beast::detail::clamp; | |
87 | auto sp = wp_.lock(); | |
88 | if(! sp) | |
89 | { | |
90 | ec = net::error::operation_aborted; | |
91 | bytes_written_ = 0; | |
92 | return this->complete(cont, ec, bytes_written_); | |
93 | } | |
94 | auto& impl = *sp; | |
95 | BOOST_ASIO_CORO_REENTER(*this) | |
96 | { | |
97 | impl.update_timer(this->get_executor()); | |
98 | ||
99 | acquire_read_lock: | |
100 | // Acquire the read lock | |
101 | if(! impl.rd_block.try_lock(this)) | |
102 | { | |
103 | do_suspend: | |
104 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
105 | { |
106 | BOOST_ASIO_HANDLER_LOCATION(( | |
107 | __FILE__, __LINE__, | |
108 | "websocket::async_read_some")); | |
109 | ||
110 | impl.op_r_rd.emplace(std::move(*this)); | |
111 | } | |
92f5a8d4 TL |
112 | impl.rd_block.lock(this); |
113 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
114 | { |
115 | BOOST_ASIO_HANDLER_LOCATION(( | |
116 | __FILE__, __LINE__, | |
117 | "websocket::async_read_some")); | |
118 | ||
119 | net::post(std::move(*this)); | |
120 | } | |
92f5a8d4 TL |
121 | BOOST_ASSERT(impl.rd_block.is_locked(this)); |
122 | ||
20effc67 | 123 | BOOST_ASSERT(!ec); |
92f5a8d4 TL |
124 | if(impl.check_stop_now(ec)) |
125 | { | |
126 | BOOST_ASSERT(ec == net::error::operation_aborted); | |
127 | goto upcall; | |
128 | } | |
129 | // VFALCO Should never get here | |
130 | ||
131 | // The only way to get read blocked is if | |
132 | // a `close_op` wrote a close frame | |
133 | BOOST_ASSERT(impl.wr_close); | |
134 | BOOST_ASSERT(impl.status_ != status::open); | |
135 | ec = net::error::operation_aborted; | |
136 | goto upcall; | |
137 | } | |
138 | else | |
139 | { | |
140 | // Make sure the stream is not closed | |
141 | if( impl.status_ == status::closed || | |
142 | impl.status_ == status::failed) | |
143 | { | |
144 | ec = net::error::operation_aborted; | |
145 | goto upcall; | |
146 | } | |
147 | } | |
148 | ||
149 | // if status_ == status::closing, we want to suspend | |
150 | // the read operation until the close completes, | |
151 | // then finish the read with operation_aborted. | |
152 | ||
153 | loop: | |
154 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
155 | // See if we need to read a frame header. This | |
156 | // condition is structured to give the decompressor | |
157 | // a chance to emit the final empty deflate block | |
158 | // | |
159 | if(impl.rd_remain == 0 && | |
160 | (! impl.rd_fh.fin || impl.rd_done)) | |
161 | { | |
162 | // Read frame header | |
163 | while(! impl.parse_fh( | |
164 | impl.rd_fh, impl.rd_buf, result_)) | |
165 | { | |
166 | if(result_) | |
167 | { | |
168 | // _Fail the WebSocket Connection_ | |
169 | if(result_ == error::message_too_big) | |
170 | code_ = close_code::too_big; | |
171 | else | |
172 | code_ = close_code::protocol_error; | |
173 | goto close; | |
174 | } | |
175 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
176 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
177 | { |
178 | BOOST_ASIO_HANDLER_LOCATION(( | |
179 | __FILE__, __LINE__, | |
180 | "websocket::async_read_some")); | |
181 | ||
182 | impl.stream().async_read_some( | |
183 | impl.rd_buf.prepare(read_size( | |
184 | impl.rd_buf, impl.rd_buf.max_size())), | |
185 | std::move(*this)); | |
186 | } | |
92f5a8d4 TL |
187 | BOOST_ASSERT(impl.rd_block.is_locked(this)); |
188 | impl.rd_buf.commit(bytes_transferred); | |
189 | if(impl.check_stop_now(ec)) | |
190 | goto upcall; | |
191 | impl.reset_idle(); | |
192 | ||
193 | // Allow a close operation | |
194 | // to acquire the read block | |
195 | impl.rd_block.unlock(this); | |
196 | if( impl.op_r_close.maybe_invoke()) | |
197 | { | |
198 | // Suspend | |
199 | BOOST_ASSERT(impl.rd_block.is_locked()); | |
200 | goto do_suspend; | |
201 | } | |
202 | // Acquire read block | |
203 | impl.rd_block.lock(this); | |
204 | } | |
205 | // Immediately apply the mask to the portion | |
206 | // of the buffer holding payload data. | |
207 | if(impl.rd_fh.len > 0 && impl.rd_fh.mask) | |
208 | detail::mask_inplace(buffers_prefix( | |
209 | clamp(impl.rd_fh.len), | |
210 | impl.rd_buf.data()), | |
211 | impl.rd_key); | |
212 | if(detail::is_control(impl.rd_fh.op)) | |
213 | { | |
214 | // Clear this otherwise the next | |
215 | // frame will be considered final. | |
216 | impl.rd_fh.fin = false; | |
217 | ||
218 | // Handle ping frame | |
219 | if(impl.rd_fh.op == detail::opcode::ping) | |
220 | { | |
221 | if(impl.ctrl_cb) | |
222 | { | |
223 | if(! cont) | |
224 | { | |
225 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
226 | { |
227 | BOOST_ASIO_HANDLER_LOCATION(( | |
228 | __FILE__, __LINE__, | |
229 | "websocket::async_read_some")); | |
230 | ||
231 | net::post(std::move(*this)); | |
232 | } | |
92f5a8d4 TL |
233 | BOOST_ASSERT(cont); |
234 | // VFALCO call check_stop_now() here? | |
235 | } | |
236 | } | |
237 | { | |
238 | auto const b = buffers_prefix( | |
239 | clamp(impl.rd_fh.len), | |
240 | impl.rd_buf.data()); | |
241 | auto const len = buffer_bytes(b); | |
242 | BOOST_ASSERT(len == impl.rd_fh.len); | |
243 | ping_data payload; | |
244 | detail::read_ping(payload, b); | |
245 | impl.rd_buf.consume(len); | |
246 | // Ignore ping when closing | |
247 | if(impl.status_ == status::closing) | |
248 | goto loop; | |
249 | if(impl.ctrl_cb) | |
250 | impl.ctrl_cb( | |
251 | frame_type::ping, payload); | |
252 | impl.rd_fb.clear(); | |
253 | impl.template write_ping< | |
254 | flat_static_buffer_base>(impl.rd_fb, | |
255 | detail::opcode::pong, payload); | |
256 | } | |
257 | ||
258 | // Allow a close operation | |
259 | // to acquire the read block | |
260 | impl.rd_block.unlock(this); | |
261 | impl.op_r_close.maybe_invoke(); | |
262 | ||
263 | // Acquire the write lock | |
264 | if(! impl.wr_block.try_lock(this)) | |
265 | { | |
266 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
267 | { |
268 | BOOST_ASIO_HANDLER_LOCATION(( | |
269 | __FILE__, __LINE__, | |
270 | "websocket::async_read_some")); | |
271 | ||
272 | impl.op_rd.emplace(std::move(*this)); | |
273 | } | |
92f5a8d4 TL |
274 | impl.wr_block.lock(this); |
275 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
276 | { |
277 | BOOST_ASIO_HANDLER_LOCATION(( | |
278 | __FILE__, __LINE__, | |
279 | "websocket::async_read_some")); | |
280 | ||
281 | net::post(std::move(*this)); | |
282 | } | |
92f5a8d4 TL |
283 | BOOST_ASSERT(impl.wr_block.is_locked(this)); |
284 | if(impl.check_stop_now(ec)) | |
285 | goto upcall; | |
286 | } | |
287 | ||
288 | // Send pong | |
289 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
290 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
291 | { |
292 | BOOST_ASIO_HANDLER_LOCATION(( | |
293 | __FILE__, __LINE__, | |
294 | "websocket::async_read_some")); | |
295 | ||
296 | net::async_write( | |
1e59de90 | 297 | impl.stream(), net::const_buffer(impl.rd_fb.data()), |
20effc67 TL |
298 | beast::detail::bind_continuation(std::move(*this))); |
299 | } | |
92f5a8d4 TL |
300 | BOOST_ASSERT(impl.wr_block.is_locked(this)); |
301 | if(impl.check_stop_now(ec)) | |
302 | goto upcall; | |
303 | impl.wr_block.unlock(this); | |
304 | impl.op_close.maybe_invoke() | |
305 | || impl.op_idle_ping.maybe_invoke() | |
306 | || impl.op_ping.maybe_invoke() | |
307 | || impl.op_wr.maybe_invoke(); | |
308 | goto acquire_read_lock; | |
309 | } | |
310 | ||
311 | // Handle pong frame | |
312 | if(impl.rd_fh.op == detail::opcode::pong) | |
313 | { | |
314 | // Ignore pong when closing | |
315 | if(! impl.wr_close && impl.ctrl_cb) | |
316 | { | |
317 | if(! cont) | |
318 | { | |
319 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
320 | { |
321 | BOOST_ASIO_HANDLER_LOCATION(( | |
322 | __FILE__, __LINE__, | |
323 | "websocket::async_read_some")); | |
324 | ||
325 | net::post(std::move(*this)); | |
326 | } | |
92f5a8d4 TL |
327 | BOOST_ASSERT(cont); |
328 | } | |
329 | } | |
330 | auto const cb = buffers_prefix(clamp( | |
331 | impl.rd_fh.len), impl.rd_buf.data()); | |
332 | auto const len = buffer_bytes(cb); | |
333 | BOOST_ASSERT(len == impl.rd_fh.len); | |
334 | ping_data payload; | |
335 | detail::read_ping(payload, cb); | |
336 | impl.rd_buf.consume(len); | |
337 | // Ignore pong when closing | |
338 | if(! impl.wr_close && impl.ctrl_cb) | |
339 | impl.ctrl_cb(frame_type::pong, payload); | |
340 | goto loop; | |
341 | } | |
342 | ||
343 | // Handle close frame | |
344 | BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); | |
345 | { | |
346 | if(impl.ctrl_cb) | |
347 | { | |
348 | if(! cont) | |
349 | { | |
350 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
351 | { |
352 | BOOST_ASIO_HANDLER_LOCATION(( | |
353 | __FILE__, __LINE__, | |
354 | "websocket::async_read_some")); | |
355 | ||
356 | net::post(std::move(*this)); | |
357 | } | |
92f5a8d4 TL |
358 | BOOST_ASSERT(cont); |
359 | } | |
360 | } | |
361 | auto const cb = buffers_prefix(clamp( | |
362 | impl.rd_fh.len), impl.rd_buf.data()); | |
363 | auto const len = buffer_bytes(cb); | |
364 | BOOST_ASSERT(len == impl.rd_fh.len); | |
365 | BOOST_ASSERT(! impl.rd_close); | |
366 | impl.rd_close = true; | |
367 | close_reason cr; | |
368 | detail::read_close(cr, cb, result_); | |
369 | if(result_) | |
370 | { | |
371 | // _Fail the WebSocket Connection_ | |
372 | code_ = close_code::protocol_error; | |
373 | goto close; | |
374 | } | |
375 | impl.cr = cr; | |
376 | impl.rd_buf.consume(len); | |
377 | if(impl.ctrl_cb) | |
378 | impl.ctrl_cb(frame_type::close, | |
379 | impl.cr.reason); | |
380 | // See if we are already closing | |
381 | if(impl.status_ == status::closing) | |
382 | { | |
383 | // _Close the WebSocket Connection_ | |
384 | BOOST_ASSERT(impl.wr_close); | |
385 | code_ = close_code::none; | |
386 | result_ = error::closed; | |
387 | goto close; | |
388 | } | |
389 | // _Start the WebSocket Closing Handshake_ | |
390 | code_ = cr.code == close_code::none ? | |
391 | close_code::normal : | |
392 | static_cast<close_code>(cr.code); | |
393 | result_ = error::closed; | |
394 | goto close; | |
395 | } | |
396 | } | |
397 | if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) | |
398 | { | |
399 | // Empty non-final frame | |
400 | goto loop; | |
401 | } | |
402 | impl.rd_done = false; | |
403 | } | |
404 | if(! impl.rd_deflated()) | |
405 | { | |
406 | if(impl.rd_remain > 0) | |
407 | { | |
408 | if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > | |
409 | (std::min)(clamp(impl.rd_remain), | |
410 | buffer_bytes(cb_))) | |
411 | { | |
412 | // Fill the read buffer first, otherwise we | |
413 | // get fewer bytes at the cost of one I/O. | |
414 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
415 | { |
416 | BOOST_ASIO_HANDLER_LOCATION(( | |
417 | __FILE__, __LINE__, | |
418 | "websocket::async_read_some")); | |
419 | ||
420 | impl.stream().async_read_some( | |
421 | impl.rd_buf.prepare(read_size( | |
422 | impl.rd_buf, impl.rd_buf.max_size())), | |
423 | std::move(*this)); | |
424 | } | |
92f5a8d4 TL |
425 | impl.rd_buf.commit(bytes_transferred); |
426 | if(impl.check_stop_now(ec)) | |
427 | goto upcall; | |
428 | impl.reset_idle(); | |
429 | if(impl.rd_fh.mask) | |
430 | detail::mask_inplace(buffers_prefix(clamp( | |
431 | impl.rd_remain), impl.rd_buf.data()), | |
432 | impl.rd_key); | |
433 | } | |
434 | if(impl.rd_buf.size() > 0) | |
435 | { | |
436 | // Copy from the read buffer. | |
437 | // The mask was already applied. | |
438 | bytes_transferred = net::buffer_copy(cb_, | |
439 | impl.rd_buf.data(), clamp(impl.rd_remain)); | |
440 | auto const mb = buffers_prefix( | |
441 | bytes_transferred, cb_); | |
442 | impl.rd_remain -= bytes_transferred; | |
443 | if(impl.rd_op == detail::opcode::text) | |
444 | { | |
445 | if(! impl.rd_utf8.write(mb) || | |
446 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
447 | ! impl.rd_utf8.finish())) | |
448 | { | |
449 | // _Fail the WebSocket Connection_ | |
450 | code_ = close_code::bad_payload; | |
451 | result_ = error::bad_frame_payload; | |
452 | goto close; | |
453 | } | |
454 | } | |
455 | bytes_written_ += bytes_transferred; | |
456 | impl.rd_size += bytes_transferred; | |
457 | impl.rd_buf.consume(bytes_transferred); | |
458 | } | |
459 | else | |
460 | { | |
461 | // Read into caller's buffer | |
462 | BOOST_ASSERT(impl.rd_remain > 0); | |
463 | BOOST_ASSERT(buffer_bytes(cb_) > 0); | |
464 | BOOST_ASSERT(buffer_bytes(buffers_prefix( | |
465 | clamp(impl.rd_remain), cb_)) > 0); | |
466 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
467 | { |
468 | BOOST_ASIO_HANDLER_LOCATION(( | |
469 | __FILE__, __LINE__, | |
470 | "websocket::async_read_some")); | |
471 | ||
472 | impl.stream().async_read_some(buffers_prefix( | |
473 | clamp(impl.rd_remain), cb_), std::move(*this)); | |
474 | } | |
92f5a8d4 TL |
475 | if(impl.check_stop_now(ec)) |
476 | goto upcall; | |
477 | impl.reset_idle(); | |
478 | BOOST_ASSERT(bytes_transferred > 0); | |
479 | auto const mb = buffers_prefix( | |
480 | bytes_transferred, cb_); | |
481 | impl.rd_remain -= bytes_transferred; | |
482 | if(impl.rd_fh.mask) | |
483 | detail::mask_inplace(mb, impl.rd_key); | |
484 | if(impl.rd_op == detail::opcode::text) | |
485 | { | |
486 | if(! impl.rd_utf8.write(mb) || | |
487 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
488 | ! impl.rd_utf8.finish())) | |
489 | { | |
490 | // _Fail the WebSocket Connection_ | |
491 | code_ = close_code::bad_payload; | |
492 | result_ = error::bad_frame_payload; | |
493 | goto close; | |
494 | } | |
495 | } | |
496 | bytes_written_ += bytes_transferred; | |
497 | impl.rd_size += bytes_transferred; | |
498 | } | |
499 | } | |
f67539c2 TL |
500 | BOOST_ASSERT( ! impl.rd_done ); |
501 | if( impl.rd_remain == 0 && impl.rd_fh.fin ) | |
502 | impl.rd_done = true; | |
92f5a8d4 TL |
503 | } |
504 | else | |
505 | { | |
506 | // Read compressed message frame payload: | |
507 | // inflate even if rd_fh_.len == 0, otherwise we | |
508 | // never emit the end-of-stream deflate block. | |
509 | while(buffer_bytes(cb_) > 0) | |
510 | { | |
511 | if( impl.rd_remain > 0 && | |
512 | impl.rd_buf.size() == 0 && | |
513 | ! did_read_) | |
514 | { | |
515 | // read new | |
516 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
517 | { |
518 | BOOST_ASIO_HANDLER_LOCATION(( | |
519 | __FILE__, __LINE__, | |
520 | "websocket::async_read_some")); | |
521 | ||
522 | impl.stream().async_read_some( | |
523 | impl.rd_buf.prepare(read_size( | |
524 | impl.rd_buf, impl.rd_buf.max_size())), | |
525 | std::move(*this)); | |
526 | } | |
92f5a8d4 TL |
527 | if(impl.check_stop_now(ec)) |
528 | goto upcall; | |
529 | impl.reset_idle(); | |
530 | BOOST_ASSERT(bytes_transferred > 0); | |
531 | impl.rd_buf.commit(bytes_transferred); | |
532 | if(impl.rd_fh.mask) | |
533 | detail::mask_inplace( | |
534 | buffers_prefix(clamp(impl.rd_remain), | |
535 | impl.rd_buf.data()), impl.rd_key); | |
536 | did_read_ = true; | |
537 | } | |
538 | zlib::z_params zs; | |
539 | { | |
540 | auto const out = buffers_front(cb_); | |
541 | zs.next_out = out.data(); | |
542 | zs.avail_out = out.size(); | |
543 | BOOST_ASSERT(zs.avail_out > 0); | |
544 | } | |
1e59de90 TL |
545 | // boolean to track the end of the message. |
546 | bool fin = false; | |
92f5a8d4 TL |
547 | if(impl.rd_remain > 0) |
548 | { | |
549 | if(impl.rd_buf.size() > 0) | |
550 | { | |
551 | // use what's there | |
552 | auto const in = buffers_prefix( | |
553 | clamp(impl.rd_remain), buffers_front( | |
554 | impl.rd_buf.data())); | |
555 | zs.avail_in = in.size(); | |
556 | zs.next_in = in.data(); | |
557 | } | |
558 | else | |
559 | { | |
560 | break; | |
561 | } | |
562 | } | |
563 | else if(impl.rd_fh.fin) | |
564 | { | |
565 | // append the empty block codes | |
1e59de90 | 566 | static std::uint8_t constexpr |
92f5a8d4 TL |
567 | empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; |
568 | zs.next_in = empty_block; | |
569 | zs.avail_in = sizeof(empty_block); | |
1e59de90 | 570 | fin = true; |
92f5a8d4 TL |
571 | } |
572 | else | |
573 | { | |
574 | break; | |
575 | } | |
576 | impl.inflate(zs, zlib::Flush::sync, ec); | |
577 | if(impl.check_stop_now(ec)) | |
578 | goto upcall; | |
1e59de90 TL |
579 | if(fin && zs.total_out == 0) { |
580 | impl.do_context_takeover_read(impl.role); | |
581 | impl.rd_done = true; | |
582 | break; | |
583 | } | |
92f5a8d4 TL |
584 | if(impl.rd_msg_max && beast::detail::sum_exceeds( |
585 | impl.rd_size, zs.total_out, impl.rd_msg_max)) | |
586 | { | |
587 | // _Fail the WebSocket Connection_ | |
588 | code_ = close_code::too_big; | |
589 | result_ = error::message_too_big; | |
590 | goto close; | |
591 | } | |
592 | cb_.consume(zs.total_out); | |
593 | impl.rd_size += zs.total_out; | |
1e59de90 TL |
594 | if (! fin) { |
595 | impl.rd_remain -= zs.total_in; | |
596 | impl.rd_buf.consume(zs.total_in); | |
597 | } | |
92f5a8d4 TL |
598 | bytes_written_ += zs.total_out; |
599 | } | |
600 | if(impl.rd_op == detail::opcode::text) | |
601 | { | |
602 | // check utf8 | |
603 | if(! impl.rd_utf8.write( | |
604 | buffers_prefix(bytes_written_, bs_)) || ( | |
605 | impl.rd_done && ! impl.rd_utf8.finish())) | |
606 | { | |
607 | // _Fail the WebSocket Connection_ | |
608 | code_ = close_code::bad_payload; | |
609 | result_ = error::bad_frame_payload; | |
610 | goto close; | |
611 | } | |
612 | } | |
613 | } | |
614 | goto upcall; | |
615 | ||
616 | close: | |
617 | // Acquire the write lock | |
618 | if(! impl.wr_block.try_lock(this)) | |
619 | { | |
620 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
621 | { |
622 | BOOST_ASIO_HANDLER_LOCATION(( | |
623 | __FILE__, __LINE__, | |
624 | "websocket::async_read_some")); | |
625 | ||
626 | impl.op_rd.emplace(std::move(*this)); | |
627 | } | |
92f5a8d4 TL |
628 | impl.wr_block.lock(this); |
629 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
630 | { |
631 | BOOST_ASIO_HANDLER_LOCATION(( | |
632 | __FILE__, __LINE__, | |
633 | "websocket::async_read_some")); | |
634 | ||
635 | net::post(std::move(*this)); | |
636 | } | |
92f5a8d4 TL |
637 | BOOST_ASSERT(impl.wr_block.is_locked(this)); |
638 | if(impl.check_stop_now(ec)) | |
639 | goto upcall; | |
640 | } | |
641 | ||
642 | impl.change_status(status::closing); | |
643 | ||
644 | if(! impl.wr_close) | |
645 | { | |
646 | impl.wr_close = true; | |
647 | ||
648 | // Serialize close frame | |
649 | impl.rd_fb.clear(); | |
650 | impl.template write_close< | |
651 | flat_static_buffer_base>( | |
652 | impl.rd_fb, code_); | |
653 | ||
654 | // Send close frame | |
655 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
656 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
657 | { |
658 | BOOST_ASIO_HANDLER_LOCATION(( | |
659 | __FILE__, __LINE__, | |
660 | "websocket::async_read_some")); | |
661 | ||
1e59de90 | 662 | net::async_write(impl.stream(), net::const_buffer(impl.rd_fb.data()), |
20effc67 TL |
663 | beast::detail::bind_continuation(std::move(*this))); |
664 | } | |
92f5a8d4 TL |
665 | BOOST_ASSERT(impl.wr_block.is_locked(this)); |
666 | if(impl.check_stop_now(ec)) | |
667 | goto upcall; | |
668 | } | |
669 | ||
670 | // Teardown | |
671 | using beast::websocket::async_teardown; | |
672 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
673 | BOOST_ASIO_CORO_YIELD | |
20effc67 TL |
674 | { |
675 | BOOST_ASIO_HANDLER_LOCATION(( | |
676 | __FILE__, __LINE__, | |
677 | "websocket::async_read_some")); | |
678 | ||
679 | async_teardown(impl.role, impl.stream(), | |
680 | beast::detail::bind_continuation(std::move(*this))); | |
681 | } | |
92f5a8d4 TL |
682 | BOOST_ASSERT(impl.wr_block.is_locked(this)); |
683 | if(ec == net::error::eof) | |
684 | { | |
685 | // Rationale: | |
686 | // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error | |
687 | ec = {}; | |
688 | } | |
689 | if(! ec) | |
690 | ec = result_; | |
691 | if(ec && ec != error::closed) | |
692 | impl.change_status(status::failed); | |
693 | else | |
694 | impl.change_status(status::closed); | |
695 | impl.close(); | |
696 | ||
697 | upcall: | |
698 | impl.rd_block.try_unlock(this); | |
699 | impl.op_r_close.maybe_invoke(); | |
700 | if(impl.wr_block.try_unlock(this)) | |
701 | impl.op_close.maybe_invoke() | |
702 | || impl.op_idle_ping.maybe_invoke() | |
703 | || impl.op_ping.maybe_invoke() | |
704 | || impl.op_wr.maybe_invoke(); | |
705 | this->complete(cont, ec, bytes_written_); | |
706 | } | |
707 | } | |
708 | }; | |
709 | ||
710 | //------------------------------------------------------------------------------ | |
711 | ||
712 | template<class NextLayer, bool deflateSupported> | |
713 | template<class Handler, class DynamicBuffer> | |
714 | class stream<NextLayer, deflateSupported>::read_op | |
715 | : public beast::async_base< | |
716 | Handler, beast::executor_type<stream>> | |
717 | , public asio::coroutine | |
718 | { | |
719 | boost::weak_ptr<impl_type> wp_; | |
720 | DynamicBuffer& b_; | |
721 | std::size_t limit_; | |
722 | std::size_t bytes_written_ = 0; | |
723 | bool some_; | |
724 | ||
725 | public: | |
726 | template<class Handler_> | |
727 | read_op( | |
728 | Handler_&& h, | |
729 | boost::shared_ptr<impl_type> const& sp, | |
730 | DynamicBuffer& b, | |
731 | std::size_t limit, | |
732 | bool some) | |
733 | : async_base<Handler, | |
734 | beast::executor_type<stream>>( | |
735 | std::forward<Handler_>(h), | |
736 | sp->stream().get_executor()) | |
737 | , wp_(sp) | |
738 | , b_(b) | |
739 | , limit_(limit ? limit : ( | |
740 | std::numeric_limits<std::size_t>::max)()) | |
741 | , some_(some) | |
742 | { | |
743 | (*this)({}, 0, false); | |
744 | } | |
745 | ||
746 | void operator()( | |
747 | error_code ec = {}, | |
748 | std::size_t bytes_transferred = 0, | |
749 | bool cont = true) | |
750 | { | |
751 | using beast::detail::clamp; | |
752 | auto sp = wp_.lock(); | |
753 | if(! sp) | |
754 | { | |
755 | ec = net::error::operation_aborted; | |
756 | bytes_written_ = 0; | |
757 | return this->complete(cont, ec, bytes_written_); | |
758 | } | |
759 | auto& impl = *sp; | |
760 | using mutable_buffers_type = typename | |
761 | DynamicBuffer::mutable_buffers_type; | |
762 | BOOST_ASIO_CORO_REENTER(*this) | |
763 | { | |
764 | do | |
765 | { | |
766 | // VFALCO TODO use boost::beast::bind_continuation | |
767 | BOOST_ASIO_CORO_YIELD | |
768 | { | |
769 | auto mb = beast::detail::dynamic_buffer_prepare(b_, | |
770 | clamp(impl.read_size_hint_db(b_), limit_), | |
771 | ec, error::buffer_overflow); | |
772 | if(impl.check_stop_now(ec)) | |
773 | goto upcall; | |
20effc67 TL |
774 | |
775 | BOOST_ASIO_HANDLER_LOCATION(( | |
776 | __FILE__, __LINE__, | |
777 | "websocket::async_read")); | |
778 | ||
92f5a8d4 TL |
779 | read_some_op<read_op, mutable_buffers_type>( |
780 | std::move(*this), sp, *mb); | |
781 | } | |
782 | ||
783 | b_.commit(bytes_transferred); | |
784 | bytes_written_ += bytes_transferred; | |
785 | if(ec) | |
786 | goto upcall; | |
787 | } | |
788 | while(! some_ && ! impl.rd_done); | |
789 | ||
790 | upcall: | |
791 | this->complete(cont, ec, bytes_written_); | |
792 | } | |
793 | } | |
794 | }; | |
795 | ||
796 | template<class NextLayer, bool deflateSupported> | |
797 | struct stream<NextLayer, deflateSupported>:: | |
798 | run_read_some_op | |
799 | { | |
800 | template< | |
801 | class ReadHandler, | |
802 | class MutableBufferSequence> | |
803 | void | |
804 | operator()( | |
805 | ReadHandler&& h, | |
806 | boost::shared_ptr<impl_type> const& sp, | |
807 | MutableBufferSequence const& b) | |
808 | { | |
809 | // If you get an error on the following line it means | |
810 | // that your handler does not meet the documented type | |
811 | // requirements for the handler. | |
812 | ||
813 | static_assert( | |
814 | beast::detail::is_invocable<ReadHandler, | |
815 | void(error_code, std::size_t)>::value, | |
816 | "ReadHandler type requirements not met"); | |
817 | ||
818 | read_some_op< | |
819 | typename std::decay<ReadHandler>::type, | |
820 | MutableBufferSequence>( | |
821 | std::forward<ReadHandler>(h), | |
822 | sp, | |
823 | b); | |
824 | } | |
825 | }; | |
826 | ||
827 | template<class NextLayer, bool deflateSupported> | |
828 | struct stream<NextLayer, deflateSupported>:: | |
829 | run_read_op | |
830 | { | |
831 | template< | |
832 | class ReadHandler, | |
833 | class DynamicBuffer> | |
834 | void | |
835 | operator()( | |
836 | ReadHandler&& h, | |
837 | boost::shared_ptr<impl_type> const& sp, | |
838 | DynamicBuffer* b, | |
839 | std::size_t limit, | |
840 | bool some) | |
841 | { | |
842 | // If you get an error on the following line it means | |
843 | // that your handler does not meet the documented type | |
844 | // requirements for the handler. | |
845 | ||
846 | static_assert( | |
847 | beast::detail::is_invocable<ReadHandler, | |
848 | void(error_code, std::size_t)>::value, | |
849 | "ReadHandler type requirements not met"); | |
850 | ||
851 | read_op< | |
852 | typename std::decay<ReadHandler>::type, | |
853 | DynamicBuffer>( | |
854 | std::forward<ReadHandler>(h), | |
855 | sp, | |
856 | *b, | |
857 | limit, | |
858 | some); | |
859 | } | |
860 | }; | |
861 | ||
862 | //------------------------------------------------------------------------------ | |
863 | ||
864 | template<class NextLayer, bool deflateSupported> | |
865 | template<class DynamicBuffer> | |
866 | std::size_t | |
867 | stream<NextLayer, deflateSupported>:: | |
868 | read(DynamicBuffer& buffer) | |
869 | { | |
870 | static_assert(is_sync_stream<next_layer_type>::value, | |
871 | "SyncStream type requirements not met"); | |
872 | static_assert( | |
873 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
874 | "DynamicBuffer type requirements not met"); | |
875 | error_code ec; | |
876 | auto const bytes_written = read(buffer, ec); | |
877 | if(ec) | |
878 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
879 | return bytes_written; | |
880 | } | |
881 | ||
882 | template<class NextLayer, bool deflateSupported> | |
883 | template<class DynamicBuffer> | |
884 | std::size_t | |
885 | stream<NextLayer, deflateSupported>:: | |
886 | read(DynamicBuffer& buffer, error_code& ec) | |
887 | { | |
888 | static_assert(is_sync_stream<next_layer_type>::value, | |
889 | "SyncStream type requirements not met"); | |
890 | static_assert( | |
891 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
892 | "DynamicBuffer type requirements not met"); | |
893 | std::size_t bytes_written = 0; | |
894 | do | |
895 | { | |
896 | bytes_written += read_some(buffer, 0, ec); | |
897 | if(ec) | |
898 | return bytes_written; | |
899 | } | |
900 | while(! is_message_done()); | |
901 | return bytes_written; | |
902 | } | |
903 | ||
904 | template<class NextLayer, bool deflateSupported> | |
20effc67 | 905 | template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler> |
92f5a8d4 TL |
906 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) |
907 | stream<NextLayer, deflateSupported>:: | |
908 | async_read(DynamicBuffer& buffer, ReadHandler&& handler) | |
909 | { | |
910 | static_assert(is_async_stream<next_layer_type>::value, | |
911 | "AsyncStream type requirements not met"); | |
912 | static_assert( | |
913 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
914 | "DynamicBuffer type requirements not met"); | |
915 | return net::async_initiate< | |
916 | ReadHandler, | |
917 | void(error_code, std::size_t)>( | |
918 | run_read_op{}, | |
919 | handler, | |
920 | impl_, | |
921 | &buffer, | |
922 | 0, | |
923 | false); | |
924 | } | |
925 | ||
926 | //------------------------------------------------------------------------------ | |
927 | ||
928 | template<class NextLayer, bool deflateSupported> | |
929 | template<class DynamicBuffer> | |
930 | std::size_t | |
931 | stream<NextLayer, deflateSupported>:: | |
932 | read_some( | |
933 | DynamicBuffer& buffer, | |
934 | std::size_t limit) | |
935 | { | |
936 | static_assert(is_sync_stream<next_layer_type>::value, | |
937 | "SyncStream type requirements not met"); | |
938 | static_assert( | |
939 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
940 | "DynamicBuffer type requirements not met"); | |
941 | error_code ec; | |
942 | auto const bytes_written = | |
943 | read_some(buffer, limit, ec); | |
944 | if(ec) | |
945 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
946 | return bytes_written; | |
947 | } | |
948 | ||
949 | template<class NextLayer, bool deflateSupported> | |
950 | template<class DynamicBuffer> | |
951 | std::size_t | |
952 | stream<NextLayer, deflateSupported>:: | |
953 | read_some( | |
954 | DynamicBuffer& buffer, | |
955 | std::size_t limit, | |
956 | error_code& ec) | |
957 | { | |
958 | static_assert(is_sync_stream<next_layer_type>::value, | |
959 | "SyncStream type requirements not met"); | |
960 | static_assert( | |
961 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
962 | "DynamicBuffer type requirements not met"); | |
963 | using beast::detail::clamp; | |
964 | if(! limit) | |
965 | limit = (std::numeric_limits<std::size_t>::max)(); | |
966 | auto const size = | |
967 | clamp(read_size_hint(buffer), limit); | |
968 | BOOST_ASSERT(size > 0); | |
969 | auto mb = beast::detail::dynamic_buffer_prepare( | |
970 | buffer, size, ec, error::buffer_overflow); | |
971 | if(impl_->check_stop_now(ec)) | |
972 | return 0; | |
973 | auto const bytes_written = read_some(*mb, ec); | |
974 | buffer.commit(bytes_written); | |
975 | return bytes_written; | |
976 | } | |
977 | ||
978 | template<class NextLayer, bool deflateSupported> | |
20effc67 | 979 | template<class DynamicBuffer, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler> |
92f5a8d4 TL |
980 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) |
981 | stream<NextLayer, deflateSupported>:: | |
982 | async_read_some( | |
983 | DynamicBuffer& buffer, | |
984 | std::size_t limit, | |
985 | ReadHandler&& handler) | |
986 | { | |
987 | static_assert(is_async_stream<next_layer_type>::value, | |
988 | "AsyncStream type requirements not met"); | |
989 | static_assert( | |
990 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
991 | "DynamicBuffer type requirements not met"); | |
992 | return net::async_initiate< | |
993 | ReadHandler, | |
994 | void(error_code, std::size_t)>( | |
995 | run_read_op{}, | |
996 | handler, | |
997 | impl_, | |
998 | &buffer, | |
999 | limit, | |
1000 | true); | |
1001 | } | |
1002 | ||
1003 | //------------------------------------------------------------------------------ | |
1004 | ||
1005 | template<class NextLayer, bool deflateSupported> | |
1006 | template<class MutableBufferSequence> | |
1007 | std::size_t | |
1008 | stream<NextLayer, deflateSupported>:: | |
1009 | read_some( | |
1010 | MutableBufferSequence const& buffers) | |
1011 | { | |
1012 | static_assert(is_sync_stream<next_layer_type>::value, | |
1013 | "SyncStream type requirements not met"); | |
1014 | static_assert(net::is_mutable_buffer_sequence< | |
1015 | MutableBufferSequence>::value, | |
1016 | "MutableBufferSequence type requirements not met"); | |
1017 | error_code ec; | |
1018 | auto const bytes_written = read_some(buffers, ec); | |
1019 | if(ec) | |
1020 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
1021 | return bytes_written; | |
1022 | } | |
1023 | ||
1024 | template<class NextLayer, bool deflateSupported> | |
1025 | template<class MutableBufferSequence> | |
1026 | std::size_t | |
1027 | stream<NextLayer, deflateSupported>:: | |
1028 | read_some( | |
1029 | MutableBufferSequence const& buffers, | |
1030 | error_code& ec) | |
1031 | { | |
1032 | static_assert(is_sync_stream<next_layer_type>::value, | |
1033 | "SyncStream type requirements not met"); | |
1034 | static_assert(net::is_mutable_buffer_sequence< | |
1035 | MutableBufferSequence>::value, | |
1036 | "MutableBufferSequence type requirements not met"); | |
1037 | using beast::detail::clamp; | |
1038 | auto& impl = *impl_; | |
1039 | close_code code{}; | |
1040 | std::size_t bytes_written = 0; | |
1041 | ec = {}; | |
1042 | // Make sure the stream is open | |
1043 | if(impl.check_stop_now(ec)) | |
1044 | return bytes_written; | |
1045 | loop: | |
1046 | // See if we need to read a frame header. This | |
1047 | // condition is structured to give the decompressor | |
1048 | // a chance to emit the final empty deflate block | |
1049 | // | |
1050 | if(impl.rd_remain == 0 && ( | |
1051 | ! impl.rd_fh.fin || impl.rd_done)) | |
1052 | { | |
1053 | // Read frame header | |
1054 | error_code result; | |
1055 | while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result)) | |
1056 | { | |
1057 | if(result) | |
1058 | { | |
1059 | // _Fail the WebSocket Connection_ | |
1060 | if(result == error::message_too_big) | |
1061 | code = close_code::too_big; | |
1062 | else | |
1063 | code = close_code::protocol_error; | |
1064 | do_fail(code, result, ec); | |
1065 | return bytes_written; | |
1066 | } | |
1067 | auto const bytes_transferred = | |
1068 | impl.stream().read_some( | |
1069 | impl.rd_buf.prepare(read_size( | |
1070 | impl.rd_buf, impl.rd_buf.max_size())), | |
1071 | ec); | |
1072 | impl.rd_buf.commit(bytes_transferred); | |
1073 | if(impl.check_stop_now(ec)) | |
1074 | return bytes_written; | |
1075 | } | |
1076 | // Immediately apply the mask to the portion | |
1077 | // of the buffer holding payload data. | |
1078 | if(impl.rd_fh.len > 0 && impl.rd_fh.mask) | |
1079 | detail::mask_inplace(buffers_prefix( | |
1080 | clamp(impl.rd_fh.len), impl.rd_buf.data()), | |
1081 | impl.rd_key); | |
1082 | if(detail::is_control(impl.rd_fh.op)) | |
1083 | { | |
1084 | // Get control frame payload | |
1085 | auto const b = buffers_prefix( | |
1086 | clamp(impl.rd_fh.len), impl.rd_buf.data()); | |
1087 | auto const len = buffer_bytes(b); | |
1088 | BOOST_ASSERT(len == impl.rd_fh.len); | |
1089 | ||
1090 | // Clear this otherwise the next | |
1091 | // frame will be considered final. | |
1092 | impl.rd_fh.fin = false; | |
1093 | ||
1094 | // Handle ping frame | |
1095 | if(impl.rd_fh.op == detail::opcode::ping) | |
1096 | { | |
1097 | ping_data payload; | |
1098 | detail::read_ping(payload, b); | |
1099 | impl.rd_buf.consume(len); | |
1100 | if(impl.wr_close) | |
1101 | { | |
1102 | // Ignore ping when closing | |
1103 | goto loop; | |
1104 | } | |
1105 | if(impl.ctrl_cb) | |
1106 | impl.ctrl_cb(frame_type::ping, payload); | |
1107 | detail::frame_buffer fb; | |
1108 | impl.template write_ping<flat_static_buffer_base>(fb, | |
1109 | detail::opcode::pong, payload); | |
1110 | net::write(impl.stream(), fb.data(), ec); | |
1111 | if(impl.check_stop_now(ec)) | |
1112 | return bytes_written; | |
1113 | goto loop; | |
1114 | } | |
1115 | // Handle pong frame | |
1116 | if(impl.rd_fh.op == detail::opcode::pong) | |
1117 | { | |
1118 | ping_data payload; | |
1119 | detail::read_ping(payload, b); | |
1120 | impl.rd_buf.consume(len); | |
1121 | if(impl.ctrl_cb) | |
1122 | impl.ctrl_cb(frame_type::pong, payload); | |
1123 | goto loop; | |
1124 | } | |
1125 | // Handle close frame | |
1126 | BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); | |
1127 | { | |
1128 | BOOST_ASSERT(! impl.rd_close); | |
1129 | impl.rd_close = true; | |
1130 | close_reason cr; | |
1131 | detail::read_close(cr, b, result); | |
1132 | if(result) | |
1133 | { | |
1134 | // _Fail the WebSocket Connection_ | |
1135 | do_fail(close_code::protocol_error, | |
1136 | result, ec); | |
1137 | return bytes_written; | |
1138 | } | |
1139 | impl.cr = cr; | |
1140 | impl.rd_buf.consume(len); | |
1141 | if(impl.ctrl_cb) | |
1142 | impl.ctrl_cb(frame_type::close, impl.cr.reason); | |
1143 | BOOST_ASSERT(! impl.wr_close); | |
1144 | // _Start the WebSocket Closing Handshake_ | |
1145 | do_fail( | |
1146 | cr.code == close_code::none ? | |
1147 | close_code::normal : | |
1148 | static_cast<close_code>(cr.code), | |
1149 | error::closed, ec); | |
1150 | return bytes_written; | |
1151 | } | |
1152 | } | |
1153 | if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) | |
1154 | { | |
1155 | // Empty non-final frame | |
1156 | goto loop; | |
1157 | } | |
1158 | impl.rd_done = false; | |
1159 | } | |
1160 | else | |
1161 | { | |
1162 | ec = {}; | |
1163 | } | |
1164 | if(! impl.rd_deflated()) | |
1165 | { | |
1166 | if(impl.rd_remain > 0) | |
1167 | { | |
1168 | if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > | |
1169 | (std::min)(clamp(impl.rd_remain), | |
1170 | buffer_bytes(buffers))) | |
1171 | { | |
1172 | // Fill the read buffer first, otherwise we | |
1173 | // get fewer bytes at the cost of one I/O. | |
1174 | impl.rd_buf.commit(impl.stream().read_some( | |
1175 | impl.rd_buf.prepare(read_size(impl.rd_buf, | |
1176 | impl.rd_buf.max_size())), ec)); | |
1177 | if(impl.check_stop_now(ec)) | |
1178 | return bytes_written; | |
1179 | if(impl.rd_fh.mask) | |
1180 | detail::mask_inplace( | |
1181 | buffers_prefix(clamp(impl.rd_remain), | |
1182 | impl.rd_buf.data()), impl.rd_key); | |
1183 | } | |
1184 | if(impl.rd_buf.size() > 0) | |
1185 | { | |
1186 | // Copy from the read buffer. | |
1187 | // The mask was already applied. | |
1188 | auto const bytes_transferred = net::buffer_copy( | |
1189 | buffers, impl.rd_buf.data(), | |
1190 | clamp(impl.rd_remain)); | |
1191 | auto const mb = buffers_prefix( | |
1192 | bytes_transferred, buffers); | |
1193 | impl.rd_remain -= bytes_transferred; | |
1194 | if(impl.rd_op == detail::opcode::text) | |
1195 | { | |
1196 | if(! impl.rd_utf8.write(mb) || | |
1197 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
1198 | ! impl.rd_utf8.finish())) | |
1199 | { | |
1200 | // _Fail the WebSocket Connection_ | |
1201 | do_fail(close_code::bad_payload, | |
1202 | error::bad_frame_payload, ec); | |
1203 | return bytes_written; | |
1204 | } | |
1205 | } | |
1206 | bytes_written += bytes_transferred; | |
1207 | impl.rd_size += bytes_transferred; | |
1208 | impl.rd_buf.consume(bytes_transferred); | |
1209 | } | |
1210 | else | |
1211 | { | |
1212 | // Read into caller's buffer | |
1213 | BOOST_ASSERT(impl.rd_remain > 0); | |
1214 | BOOST_ASSERT(buffer_bytes(buffers) > 0); | |
1215 | BOOST_ASSERT(buffer_bytes(buffers_prefix( | |
1216 | clamp(impl.rd_remain), buffers)) > 0); | |
1217 | auto const bytes_transferred = | |
1218 | impl.stream().read_some(buffers_prefix( | |
1219 | clamp(impl.rd_remain), buffers), ec); | |
1220 | // VFALCO What if some bytes were written? | |
1221 | if(impl.check_stop_now(ec)) | |
1222 | return bytes_written; | |
1223 | BOOST_ASSERT(bytes_transferred > 0); | |
1224 | auto const mb = buffers_prefix( | |
1225 | bytes_transferred, buffers); | |
1226 | impl.rd_remain -= bytes_transferred; | |
1227 | if(impl.rd_fh.mask) | |
1228 | detail::mask_inplace(mb, impl.rd_key); | |
1229 | if(impl.rd_op == detail::opcode::text) | |
1230 | { | |
1231 | if(! impl.rd_utf8.write(mb) || | |
1232 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
1233 | ! impl.rd_utf8.finish())) | |
1234 | { | |
1235 | // _Fail the WebSocket Connection_ | |
1236 | do_fail(close_code::bad_payload, | |
1237 | error::bad_frame_payload, ec); | |
1238 | return bytes_written; | |
1239 | } | |
1240 | } | |
1241 | bytes_written += bytes_transferred; | |
1242 | impl.rd_size += bytes_transferred; | |
1243 | } | |
1244 | } | |
f67539c2 TL |
1245 | BOOST_ASSERT( ! impl.rd_done ); |
1246 | if( impl.rd_remain == 0 && impl.rd_fh.fin ) | |
1247 | impl.rd_done = true; | |
92f5a8d4 TL |
1248 | } |
1249 | else | |
1250 | { | |
1251 | // Read compressed message frame payload: | |
1252 | // inflate even if rd_fh_.len == 0, otherwise we | |
1253 | // never emit the end-of-stream deflate block. | |
1254 | // | |
1255 | bool did_read = false; | |
1256 | buffers_suffix<MutableBufferSequence> cb(buffers); | |
1257 | while(buffer_bytes(cb) > 0) | |
1258 | { | |
1259 | zlib::z_params zs; | |
1260 | { | |
1261 | auto const out = beast::buffers_front(cb); | |
1262 | zs.next_out = out.data(); | |
1263 | zs.avail_out = out.size(); | |
1264 | BOOST_ASSERT(zs.avail_out > 0); | |
1265 | } | |
1e59de90 TL |
1266 | // boolean to track the end of the message. |
1267 | bool fin = false; | |
92f5a8d4 TL |
1268 | if(impl.rd_remain > 0) |
1269 | { | |
1270 | if(impl.rd_buf.size() > 0) | |
1271 | { | |
1272 | // use what's there | |
1273 | auto const in = buffers_prefix( | |
1274 | clamp(impl.rd_remain), beast::buffers_front( | |
1275 | impl.rd_buf.data())); | |
1276 | zs.avail_in = in.size(); | |
1277 | zs.next_in = in.data(); | |
1278 | } | |
1279 | else if(! did_read) | |
1280 | { | |
1281 | // read new | |
1282 | auto const bytes_transferred = | |
1283 | impl.stream().read_some( | |
1284 | impl.rd_buf.prepare(read_size( | |
1285 | impl.rd_buf, impl.rd_buf.max_size())), | |
1286 | ec); | |
1287 | if(impl.check_stop_now(ec)) | |
1288 | return bytes_written; | |
1289 | BOOST_ASSERT(bytes_transferred > 0); | |
1290 | impl.rd_buf.commit(bytes_transferred); | |
1291 | if(impl.rd_fh.mask) | |
1292 | detail::mask_inplace( | |
1293 | buffers_prefix(clamp(impl.rd_remain), | |
1294 | impl.rd_buf.data()), impl.rd_key); | |
1295 | auto const in = buffers_prefix( | |
1296 | clamp(impl.rd_remain), buffers_front( | |
1297 | impl.rd_buf.data())); | |
1298 | zs.avail_in = in.size(); | |
1299 | zs.next_in = in.data(); | |
1300 | did_read = true; | |
1301 | } | |
1302 | else | |
1303 | { | |
1304 | break; | |
1305 | } | |
1306 | } | |
1307 | else if(impl.rd_fh.fin) | |
1308 | { | |
1309 | // append the empty block codes | |
1310 | static std::uint8_t constexpr | |
1e59de90 | 1311 | empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; |
92f5a8d4 TL |
1312 | zs.next_in = empty_block; |
1313 | zs.avail_in = sizeof(empty_block); | |
1e59de90 | 1314 | fin = true; |
92f5a8d4 TL |
1315 | } |
1316 | else | |
1317 | { | |
1318 | break; | |
1319 | } | |
1320 | impl.inflate(zs, zlib::Flush::sync, ec); | |
1321 | if(impl.check_stop_now(ec)) | |
1322 | return bytes_written; | |
1e59de90 TL |
1323 | if (fin && zs.total_out == 0) { |
1324 | impl.do_context_takeover_read(impl.role); | |
1325 | impl.rd_done = true; | |
1326 | break; | |
1327 | } | |
92f5a8d4 TL |
1328 | if(impl.rd_msg_max && beast::detail::sum_exceeds( |
1329 | impl.rd_size, zs.total_out, impl.rd_msg_max)) | |
1330 | { | |
1331 | do_fail(close_code::too_big, | |
1332 | error::message_too_big, ec); | |
1333 | return bytes_written; | |
1334 | } | |
1335 | cb.consume(zs.total_out); | |
1336 | impl.rd_size += zs.total_out; | |
1e59de90 TL |
1337 | if (! fin) { |
1338 | impl.rd_remain -= zs.total_in; | |
1339 | impl.rd_buf.consume(zs.total_in); | |
1340 | } | |
92f5a8d4 TL |
1341 | bytes_written += zs.total_out; |
1342 | } | |
1343 | if(impl.rd_op == detail::opcode::text) | |
1344 | { | |
1345 | // check utf8 | |
1346 | if(! impl.rd_utf8.write(beast::buffers_prefix( | |
1347 | bytes_written, buffers)) || ( | |
1348 | impl.rd_done && ! impl.rd_utf8.finish())) | |
1349 | { | |
1350 | // _Fail the WebSocket Connection_ | |
1351 | do_fail(close_code::bad_payload, | |
1352 | error::bad_frame_payload, ec); | |
1353 | return bytes_written; | |
1354 | } | |
1355 | } | |
1356 | } | |
1357 | return bytes_written; | |
1358 | } | |
1359 | ||
1360 | template<class NextLayer, bool deflateSupported> | |
20effc67 | 1361 | template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler> |
92f5a8d4 TL |
1362 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) |
1363 | stream<NextLayer, deflateSupported>:: | |
1364 | async_read_some( | |
1365 | MutableBufferSequence const& buffers, | |
1366 | ReadHandler&& handler) | |
1367 | { | |
1368 | static_assert(is_async_stream<next_layer_type>::value, | |
1369 | "AsyncStream type requirements not met"); | |
1370 | static_assert(net::is_mutable_buffer_sequence< | |
1371 | MutableBufferSequence>::value, | |
1372 | "MutableBufferSequence type requirements not met"); | |
1373 | return net::async_initiate< | |
1374 | ReadHandler, | |
1375 | void(error_code, std::size_t)>( | |
1376 | run_read_some_op{}, | |
1377 | handler, | |
1378 | impl_, | |
1379 | buffers); | |
1380 | } | |
1381 | ||
1382 | } // websocket | |
1383 | } // beast | |
1384 | } // boost | |
1385 | ||
1386 | #endif |