]>
Commit | Line | Data |
---|---|---|
92f5a8d4 TL |
1 | // |
2 | // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com) | |
3 | // | |
4 | // Distributed under the Boost Software License, Version 1.0. (See accompanying | |
5 | // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) | |
6 | // | |
7 | // Official repository: https://github.com/boostorg/beast | |
8 | // | |
9 | ||
10 | #ifndef BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP | |
11 | #define BOOST_BEAST_WEBSOCKET_IMPL_READ_HPP | |
12 | ||
13 | #include <boost/beast/core/buffer_traits.hpp> | |
14 | #include <boost/beast/websocket/teardown.hpp> | |
15 | #include <boost/beast/websocket/detail/mask.hpp> | |
16 | #include <boost/beast/websocket/impl/stream_impl.hpp> | |
17 | #include <boost/beast/core/async_base.hpp> | |
18 | #include <boost/beast/core/bind_handler.hpp> | |
19 | #include <boost/beast/core/buffers_prefix.hpp> | |
20 | #include <boost/beast/core/buffers_suffix.hpp> | |
21 | #include <boost/beast/core/flat_static_buffer.hpp> | |
22 | #include <boost/beast/core/read_size.hpp> | |
23 | #include <boost/beast/core/stream_traits.hpp> | |
24 | #include <boost/beast/core/detail/bind_continuation.hpp> | |
25 | #include <boost/beast/core/detail/buffer.hpp> | |
26 | #include <boost/beast/core/detail/clamp.hpp> | |
27 | #include <boost/beast/core/detail/config.hpp> | |
28 | #include <boost/asio/coroutine.hpp> | |
29 | #include <boost/asio/post.hpp> | |
30 | #include <boost/assert.hpp> | |
31 | #include <boost/config.hpp> | |
32 | #include <boost/optional.hpp> | |
33 | #include <boost/throw_exception.hpp> | |
34 | #include <algorithm> | |
35 | #include <limits> | |
36 | #include <memory> | |
37 | ||
38 | namespace boost { | |
39 | namespace beast { | |
40 | namespace websocket { | |
41 | ||
42 | /* Read some message data into a buffer sequence. | |
43 | ||
44 | Also reads and handles control frames. | |
45 | */ | |
46 | template<class NextLayer, bool deflateSupported> | |
47 | template<class Handler, class MutableBufferSequence> | |
48 | class stream<NextLayer, deflateSupported>::read_some_op | |
49 | : public beast::async_base< | |
50 | Handler, beast::executor_type<stream>> | |
51 | , public asio::coroutine | |
52 | { | |
53 | boost::weak_ptr<impl_type> wp_; | |
54 | MutableBufferSequence bs_; | |
55 | buffers_suffix<MutableBufferSequence> cb_; | |
56 | std::size_t bytes_written_ = 0; | |
57 | error_code result_; | |
58 | close_code code_; | |
59 | bool did_read_ = false; | |
60 | ||
61 | public: | |
62 | static constexpr int id = 1; // for soft_mutex | |
63 | ||
64 | template<class Handler_> | |
65 | read_some_op( | |
66 | Handler_&& h, | |
67 | boost::shared_ptr<impl_type> const& sp, | |
68 | MutableBufferSequence const& bs) | |
69 | : async_base< | |
70 | Handler, beast::executor_type<stream>>( | |
71 | std::forward<Handler_>(h), | |
72 | sp->stream().get_executor()) | |
73 | , wp_(sp) | |
74 | , bs_(bs) | |
75 | , cb_(bs) | |
76 | , code_(close_code::none) | |
77 | { | |
78 | (*this)({}, 0, false); | |
79 | } | |
80 | ||
81 | void operator()( | |
82 | error_code ec = {}, | |
83 | std::size_t bytes_transferred = 0, | |
84 | bool cont = true) | |
85 | { | |
86 | using beast::detail::clamp; | |
87 | auto sp = wp_.lock(); | |
88 | if(! sp) | |
89 | { | |
90 | ec = net::error::operation_aborted; | |
91 | bytes_written_ = 0; | |
92 | return this->complete(cont, ec, bytes_written_); | |
93 | } | |
94 | auto& impl = *sp; | |
95 | BOOST_ASIO_CORO_REENTER(*this) | |
96 | { | |
97 | impl.update_timer(this->get_executor()); | |
98 | ||
99 | acquire_read_lock: | |
100 | // Acquire the read lock | |
101 | if(! impl.rd_block.try_lock(this)) | |
102 | { | |
103 | do_suspend: | |
104 | BOOST_ASIO_CORO_YIELD | |
105 | impl.op_r_rd.emplace(std::move(*this)); | |
106 | impl.rd_block.lock(this); | |
107 | BOOST_ASIO_CORO_YIELD | |
108 | net::post(std::move(*this)); | |
109 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
110 | ||
111 | // VFALCO Is this check correct here? | |
112 | BOOST_ASSERT(! ec && impl.check_stop_now(ec)); | |
113 | if(impl.check_stop_now(ec)) | |
114 | { | |
115 | BOOST_ASSERT(ec == net::error::operation_aborted); | |
116 | goto upcall; | |
117 | } | |
118 | // VFALCO Should never get here | |
119 | ||
120 | // The only way to get read blocked is if | |
121 | // a `close_op` wrote a close frame | |
122 | BOOST_ASSERT(impl.wr_close); | |
123 | BOOST_ASSERT(impl.status_ != status::open); | |
124 | ec = net::error::operation_aborted; | |
125 | goto upcall; | |
126 | } | |
127 | else | |
128 | { | |
129 | // Make sure the stream is not closed | |
130 | if( impl.status_ == status::closed || | |
131 | impl.status_ == status::failed) | |
132 | { | |
133 | ec = net::error::operation_aborted; | |
134 | goto upcall; | |
135 | } | |
136 | } | |
137 | ||
138 | // if status_ == status::closing, we want to suspend | |
139 | // the read operation until the close completes, | |
140 | // then finish the read with operation_aborted. | |
141 | ||
142 | loop: | |
143 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
144 | // See if we need to read a frame header. This | |
145 | // condition is structured to give the decompressor | |
146 | // a chance to emit the final empty deflate block | |
147 | // | |
148 | if(impl.rd_remain == 0 && | |
149 | (! impl.rd_fh.fin || impl.rd_done)) | |
150 | { | |
151 | // Read frame header | |
152 | while(! impl.parse_fh( | |
153 | impl.rd_fh, impl.rd_buf, result_)) | |
154 | { | |
155 | if(result_) | |
156 | { | |
157 | // _Fail the WebSocket Connection_ | |
158 | if(result_ == error::message_too_big) | |
159 | code_ = close_code::too_big; | |
160 | else | |
161 | code_ = close_code::protocol_error; | |
162 | goto close; | |
163 | } | |
164 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
165 | BOOST_ASIO_CORO_YIELD | |
166 | impl.stream().async_read_some( | |
167 | impl.rd_buf.prepare(read_size( | |
168 | impl.rd_buf, impl.rd_buf.max_size())), | |
169 | std::move(*this)); | |
170 | BOOST_ASSERT(impl.rd_block.is_locked(this)); | |
171 | impl.rd_buf.commit(bytes_transferred); | |
172 | if(impl.check_stop_now(ec)) | |
173 | goto upcall; | |
174 | impl.reset_idle(); | |
175 | ||
176 | // Allow a close operation | |
177 | // to acquire the read block | |
178 | impl.rd_block.unlock(this); | |
179 | if( impl.op_r_close.maybe_invoke()) | |
180 | { | |
181 | // Suspend | |
182 | BOOST_ASSERT(impl.rd_block.is_locked()); | |
183 | goto do_suspend; | |
184 | } | |
185 | // Acquire read block | |
186 | impl.rd_block.lock(this); | |
187 | } | |
188 | // Immediately apply the mask to the portion | |
189 | // of the buffer holding payload data. | |
190 | if(impl.rd_fh.len > 0 && impl.rd_fh.mask) | |
191 | detail::mask_inplace(buffers_prefix( | |
192 | clamp(impl.rd_fh.len), | |
193 | impl.rd_buf.data()), | |
194 | impl.rd_key); | |
195 | if(detail::is_control(impl.rd_fh.op)) | |
196 | { | |
197 | // Clear this otherwise the next | |
198 | // frame will be considered final. | |
199 | impl.rd_fh.fin = false; | |
200 | ||
201 | // Handle ping frame | |
202 | if(impl.rd_fh.op == detail::opcode::ping) | |
203 | { | |
204 | if(impl.ctrl_cb) | |
205 | { | |
206 | if(! cont) | |
207 | { | |
208 | BOOST_ASIO_CORO_YIELD | |
209 | net::post(std::move(*this)); | |
210 | BOOST_ASSERT(cont); | |
211 | // VFALCO call check_stop_now() here? | |
212 | } | |
213 | } | |
214 | { | |
215 | auto const b = buffers_prefix( | |
216 | clamp(impl.rd_fh.len), | |
217 | impl.rd_buf.data()); | |
218 | auto const len = buffer_bytes(b); | |
219 | BOOST_ASSERT(len == impl.rd_fh.len); | |
220 | ping_data payload; | |
221 | detail::read_ping(payload, b); | |
222 | impl.rd_buf.consume(len); | |
223 | // Ignore ping when closing | |
224 | if(impl.status_ == status::closing) | |
225 | goto loop; | |
226 | if(impl.ctrl_cb) | |
227 | impl.ctrl_cb( | |
228 | frame_type::ping, payload); | |
229 | impl.rd_fb.clear(); | |
230 | impl.template write_ping< | |
231 | flat_static_buffer_base>(impl.rd_fb, | |
232 | detail::opcode::pong, payload); | |
233 | } | |
234 | ||
235 | // Allow a close operation | |
236 | // to acquire the read block | |
237 | impl.rd_block.unlock(this); | |
238 | impl.op_r_close.maybe_invoke(); | |
239 | ||
240 | // Acquire the write lock | |
241 | if(! impl.wr_block.try_lock(this)) | |
242 | { | |
243 | BOOST_ASIO_CORO_YIELD | |
244 | impl.op_rd.emplace(std::move(*this)); | |
245 | impl.wr_block.lock(this); | |
246 | BOOST_ASIO_CORO_YIELD | |
247 | net::post(std::move(*this)); | |
248 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
249 | if(impl.check_stop_now(ec)) | |
250 | goto upcall; | |
251 | } | |
252 | ||
253 | // Send pong | |
254 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
255 | BOOST_ASIO_CORO_YIELD | |
256 | net::async_write( | |
257 | impl.stream(), impl.rd_fb.data(), | |
258 | beast::detail::bind_continuation(std::move(*this))); | |
259 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
260 | if(impl.check_stop_now(ec)) | |
261 | goto upcall; | |
262 | impl.wr_block.unlock(this); | |
263 | impl.op_close.maybe_invoke() | |
264 | || impl.op_idle_ping.maybe_invoke() | |
265 | || impl.op_ping.maybe_invoke() | |
266 | || impl.op_wr.maybe_invoke(); | |
267 | goto acquire_read_lock; | |
268 | } | |
269 | ||
270 | // Handle pong frame | |
271 | if(impl.rd_fh.op == detail::opcode::pong) | |
272 | { | |
273 | // Ignore pong when closing | |
274 | if(! impl.wr_close && impl.ctrl_cb) | |
275 | { | |
276 | if(! cont) | |
277 | { | |
278 | BOOST_ASIO_CORO_YIELD | |
279 | net::post(std::move(*this)); | |
280 | BOOST_ASSERT(cont); | |
281 | } | |
282 | } | |
283 | auto const cb = buffers_prefix(clamp( | |
284 | impl.rd_fh.len), impl.rd_buf.data()); | |
285 | auto const len = buffer_bytes(cb); | |
286 | BOOST_ASSERT(len == impl.rd_fh.len); | |
287 | ping_data payload; | |
288 | detail::read_ping(payload, cb); | |
289 | impl.rd_buf.consume(len); | |
290 | // Ignore pong when closing | |
291 | if(! impl.wr_close && impl.ctrl_cb) | |
292 | impl.ctrl_cb(frame_type::pong, payload); | |
293 | goto loop; | |
294 | } | |
295 | ||
296 | // Handle close frame | |
297 | BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); | |
298 | { | |
299 | if(impl.ctrl_cb) | |
300 | { | |
301 | if(! cont) | |
302 | { | |
303 | BOOST_ASIO_CORO_YIELD | |
304 | net::post(std::move(*this)); | |
305 | BOOST_ASSERT(cont); | |
306 | } | |
307 | } | |
308 | auto const cb = buffers_prefix(clamp( | |
309 | impl.rd_fh.len), impl.rd_buf.data()); | |
310 | auto const len = buffer_bytes(cb); | |
311 | BOOST_ASSERT(len == impl.rd_fh.len); | |
312 | BOOST_ASSERT(! impl.rd_close); | |
313 | impl.rd_close = true; | |
314 | close_reason cr; | |
315 | detail::read_close(cr, cb, result_); | |
316 | if(result_) | |
317 | { | |
318 | // _Fail the WebSocket Connection_ | |
319 | code_ = close_code::protocol_error; | |
320 | goto close; | |
321 | } | |
322 | impl.cr = cr; | |
323 | impl.rd_buf.consume(len); | |
324 | if(impl.ctrl_cb) | |
325 | impl.ctrl_cb(frame_type::close, | |
326 | impl.cr.reason); | |
327 | // See if we are already closing | |
328 | if(impl.status_ == status::closing) | |
329 | { | |
330 | // _Close the WebSocket Connection_ | |
331 | BOOST_ASSERT(impl.wr_close); | |
332 | code_ = close_code::none; | |
333 | result_ = error::closed; | |
334 | goto close; | |
335 | } | |
336 | // _Start the WebSocket Closing Handshake_ | |
337 | code_ = cr.code == close_code::none ? | |
338 | close_code::normal : | |
339 | static_cast<close_code>(cr.code); | |
340 | result_ = error::closed; | |
341 | goto close; | |
342 | } | |
343 | } | |
344 | if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) | |
345 | { | |
346 | // Empty non-final frame | |
347 | goto loop; | |
348 | } | |
349 | impl.rd_done = false; | |
350 | } | |
351 | if(! impl.rd_deflated()) | |
352 | { | |
353 | if(impl.rd_remain > 0) | |
354 | { | |
355 | if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > | |
356 | (std::min)(clamp(impl.rd_remain), | |
357 | buffer_bytes(cb_))) | |
358 | { | |
359 | // Fill the read buffer first, otherwise we | |
360 | // get fewer bytes at the cost of one I/O. | |
361 | BOOST_ASIO_CORO_YIELD | |
362 | impl.stream().async_read_some( | |
363 | impl.rd_buf.prepare(read_size( | |
364 | impl.rd_buf, impl.rd_buf.max_size())), | |
365 | std::move(*this)); | |
366 | impl.rd_buf.commit(bytes_transferred); | |
367 | if(impl.check_stop_now(ec)) | |
368 | goto upcall; | |
369 | impl.reset_idle(); | |
370 | if(impl.rd_fh.mask) | |
371 | detail::mask_inplace(buffers_prefix(clamp( | |
372 | impl.rd_remain), impl.rd_buf.data()), | |
373 | impl.rd_key); | |
374 | } | |
375 | if(impl.rd_buf.size() > 0) | |
376 | { | |
377 | // Copy from the read buffer. | |
378 | // The mask was already applied. | |
379 | bytes_transferred = net::buffer_copy(cb_, | |
380 | impl.rd_buf.data(), clamp(impl.rd_remain)); | |
381 | auto const mb = buffers_prefix( | |
382 | bytes_transferred, cb_); | |
383 | impl.rd_remain -= bytes_transferred; | |
384 | if(impl.rd_op == detail::opcode::text) | |
385 | { | |
386 | if(! impl.rd_utf8.write(mb) || | |
387 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
388 | ! impl.rd_utf8.finish())) | |
389 | { | |
390 | // _Fail the WebSocket Connection_ | |
391 | code_ = close_code::bad_payload; | |
392 | result_ = error::bad_frame_payload; | |
393 | goto close; | |
394 | } | |
395 | } | |
396 | bytes_written_ += bytes_transferred; | |
397 | impl.rd_size += bytes_transferred; | |
398 | impl.rd_buf.consume(bytes_transferred); | |
399 | } | |
400 | else | |
401 | { | |
402 | // Read into caller's buffer | |
403 | BOOST_ASSERT(impl.rd_remain > 0); | |
404 | BOOST_ASSERT(buffer_bytes(cb_) > 0); | |
405 | BOOST_ASSERT(buffer_bytes(buffers_prefix( | |
406 | clamp(impl.rd_remain), cb_)) > 0); | |
407 | BOOST_ASIO_CORO_YIELD | |
408 | impl.stream().async_read_some(buffers_prefix( | |
409 | clamp(impl.rd_remain), cb_), std::move(*this)); | |
410 | if(impl.check_stop_now(ec)) | |
411 | goto upcall; | |
412 | impl.reset_idle(); | |
413 | BOOST_ASSERT(bytes_transferred > 0); | |
414 | auto const mb = buffers_prefix( | |
415 | bytes_transferred, cb_); | |
416 | impl.rd_remain -= bytes_transferred; | |
417 | if(impl.rd_fh.mask) | |
418 | detail::mask_inplace(mb, impl.rd_key); | |
419 | if(impl.rd_op == detail::opcode::text) | |
420 | { | |
421 | if(! impl.rd_utf8.write(mb) || | |
422 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
423 | ! impl.rd_utf8.finish())) | |
424 | { | |
425 | // _Fail the WebSocket Connection_ | |
426 | code_ = close_code::bad_payload; | |
427 | result_ = error::bad_frame_payload; | |
428 | goto close; | |
429 | } | |
430 | } | |
431 | bytes_written_ += bytes_transferred; | |
432 | impl.rd_size += bytes_transferred; | |
433 | } | |
434 | } | |
f67539c2 TL |
435 | BOOST_ASSERT( ! impl.rd_done ); |
436 | if( impl.rd_remain == 0 && impl.rd_fh.fin ) | |
437 | impl.rd_done = true; | |
92f5a8d4 TL |
438 | } |
439 | else | |
440 | { | |
441 | // Read compressed message frame payload: | |
442 | // inflate even if rd_fh_.len == 0, otherwise we | |
443 | // never emit the end-of-stream deflate block. | |
444 | while(buffer_bytes(cb_) > 0) | |
445 | { | |
446 | if( impl.rd_remain > 0 && | |
447 | impl.rd_buf.size() == 0 && | |
448 | ! did_read_) | |
449 | { | |
450 | // read new | |
451 | BOOST_ASIO_CORO_YIELD | |
452 | impl.stream().async_read_some( | |
453 | impl.rd_buf.prepare(read_size( | |
454 | impl.rd_buf, impl.rd_buf.max_size())), | |
455 | std::move(*this)); | |
456 | if(impl.check_stop_now(ec)) | |
457 | goto upcall; | |
458 | impl.reset_idle(); | |
459 | BOOST_ASSERT(bytes_transferred > 0); | |
460 | impl.rd_buf.commit(bytes_transferred); | |
461 | if(impl.rd_fh.mask) | |
462 | detail::mask_inplace( | |
463 | buffers_prefix(clamp(impl.rd_remain), | |
464 | impl.rd_buf.data()), impl.rd_key); | |
465 | did_read_ = true; | |
466 | } | |
467 | zlib::z_params zs; | |
468 | { | |
469 | auto const out = buffers_front(cb_); | |
470 | zs.next_out = out.data(); | |
471 | zs.avail_out = out.size(); | |
472 | BOOST_ASSERT(zs.avail_out > 0); | |
473 | } | |
474 | if(impl.rd_remain > 0) | |
475 | { | |
476 | if(impl.rd_buf.size() > 0) | |
477 | { | |
478 | // use what's there | |
479 | auto const in = buffers_prefix( | |
480 | clamp(impl.rd_remain), buffers_front( | |
481 | impl.rd_buf.data())); | |
482 | zs.avail_in = in.size(); | |
483 | zs.next_in = in.data(); | |
484 | } | |
485 | else | |
486 | { | |
487 | break; | |
488 | } | |
489 | } | |
490 | else if(impl.rd_fh.fin) | |
491 | { | |
492 | // append the empty block codes | |
493 | std::uint8_t constexpr | |
494 | empty_block[4] = { 0x00, 0x00, 0xff, 0xff }; | |
495 | zs.next_in = empty_block; | |
496 | zs.avail_in = sizeof(empty_block); | |
497 | impl.inflate(zs, zlib::Flush::sync, ec); | |
498 | if(! ec) | |
499 | { | |
500 | // https://github.com/madler/zlib/issues/280 | |
501 | if(zs.total_out > 0) | |
502 | ec = error::partial_deflate_block; | |
503 | } | |
504 | if(impl.check_stop_now(ec)) | |
505 | goto upcall; | |
506 | impl.do_context_takeover_read(impl.role); | |
507 | impl.rd_done = true; | |
508 | break; | |
509 | } | |
510 | else | |
511 | { | |
512 | break; | |
513 | } | |
514 | impl.inflate(zs, zlib::Flush::sync, ec); | |
515 | if(impl.check_stop_now(ec)) | |
516 | goto upcall; | |
517 | if(impl.rd_msg_max && beast::detail::sum_exceeds( | |
518 | impl.rd_size, zs.total_out, impl.rd_msg_max)) | |
519 | { | |
520 | // _Fail the WebSocket Connection_ | |
521 | code_ = close_code::too_big; | |
522 | result_ = error::message_too_big; | |
523 | goto close; | |
524 | } | |
525 | cb_.consume(zs.total_out); | |
526 | impl.rd_size += zs.total_out; | |
527 | impl.rd_remain -= zs.total_in; | |
528 | impl.rd_buf.consume(zs.total_in); | |
529 | bytes_written_ += zs.total_out; | |
530 | } | |
531 | if(impl.rd_op == detail::opcode::text) | |
532 | { | |
533 | // check utf8 | |
534 | if(! impl.rd_utf8.write( | |
535 | buffers_prefix(bytes_written_, bs_)) || ( | |
536 | impl.rd_done && ! impl.rd_utf8.finish())) | |
537 | { | |
538 | // _Fail the WebSocket Connection_ | |
539 | code_ = close_code::bad_payload; | |
540 | result_ = error::bad_frame_payload; | |
541 | goto close; | |
542 | } | |
543 | } | |
544 | } | |
545 | goto upcall; | |
546 | ||
547 | close: | |
548 | // Acquire the write lock | |
549 | if(! impl.wr_block.try_lock(this)) | |
550 | { | |
551 | BOOST_ASIO_CORO_YIELD | |
552 | impl.op_rd.emplace(std::move(*this)); | |
553 | impl.wr_block.lock(this); | |
554 | BOOST_ASIO_CORO_YIELD | |
555 | net::post(std::move(*this)); | |
556 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
557 | if(impl.check_stop_now(ec)) | |
558 | goto upcall; | |
559 | } | |
560 | ||
561 | impl.change_status(status::closing); | |
562 | ||
563 | if(! impl.wr_close) | |
564 | { | |
565 | impl.wr_close = true; | |
566 | ||
567 | // Serialize close frame | |
568 | impl.rd_fb.clear(); | |
569 | impl.template write_close< | |
570 | flat_static_buffer_base>( | |
571 | impl.rd_fb, code_); | |
572 | ||
573 | // Send close frame | |
574 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
575 | BOOST_ASIO_CORO_YIELD | |
576 | net::async_write(impl.stream(), impl.rd_fb.data(), | |
577 | beast::detail::bind_continuation(std::move(*this))); | |
578 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
579 | if(impl.check_stop_now(ec)) | |
580 | goto upcall; | |
581 | } | |
582 | ||
583 | // Teardown | |
584 | using beast::websocket::async_teardown; | |
585 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
586 | BOOST_ASIO_CORO_YIELD | |
587 | async_teardown(impl.role, impl.stream(), | |
588 | beast::detail::bind_continuation(std::move(*this))); | |
589 | BOOST_ASSERT(impl.wr_block.is_locked(this)); | |
590 | if(ec == net::error::eof) | |
591 | { | |
592 | // Rationale: | |
593 | // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error | |
594 | ec = {}; | |
595 | } | |
596 | if(! ec) | |
597 | ec = result_; | |
598 | if(ec && ec != error::closed) | |
599 | impl.change_status(status::failed); | |
600 | else | |
601 | impl.change_status(status::closed); | |
602 | impl.close(); | |
603 | ||
604 | upcall: | |
605 | impl.rd_block.try_unlock(this); | |
606 | impl.op_r_close.maybe_invoke(); | |
607 | if(impl.wr_block.try_unlock(this)) | |
608 | impl.op_close.maybe_invoke() | |
609 | || impl.op_idle_ping.maybe_invoke() | |
610 | || impl.op_ping.maybe_invoke() | |
611 | || impl.op_wr.maybe_invoke(); | |
612 | this->complete(cont, ec, bytes_written_); | |
613 | } | |
614 | } | |
615 | }; | |
616 | ||
617 | //------------------------------------------------------------------------------ | |
618 | ||
619 | template<class NextLayer, bool deflateSupported> | |
620 | template<class Handler, class DynamicBuffer> | |
621 | class stream<NextLayer, deflateSupported>::read_op | |
622 | : public beast::async_base< | |
623 | Handler, beast::executor_type<stream>> | |
624 | , public asio::coroutine | |
625 | { | |
626 | boost::weak_ptr<impl_type> wp_; | |
627 | DynamicBuffer& b_; | |
628 | std::size_t limit_; | |
629 | std::size_t bytes_written_ = 0; | |
630 | bool some_; | |
631 | ||
632 | public: | |
633 | template<class Handler_> | |
634 | read_op( | |
635 | Handler_&& h, | |
636 | boost::shared_ptr<impl_type> const& sp, | |
637 | DynamicBuffer& b, | |
638 | std::size_t limit, | |
639 | bool some) | |
640 | : async_base<Handler, | |
641 | beast::executor_type<stream>>( | |
642 | std::forward<Handler_>(h), | |
643 | sp->stream().get_executor()) | |
644 | , wp_(sp) | |
645 | , b_(b) | |
646 | , limit_(limit ? limit : ( | |
647 | std::numeric_limits<std::size_t>::max)()) | |
648 | , some_(some) | |
649 | { | |
650 | (*this)({}, 0, false); | |
651 | } | |
652 | ||
653 | void operator()( | |
654 | error_code ec = {}, | |
655 | std::size_t bytes_transferred = 0, | |
656 | bool cont = true) | |
657 | { | |
658 | using beast::detail::clamp; | |
659 | auto sp = wp_.lock(); | |
660 | if(! sp) | |
661 | { | |
662 | ec = net::error::operation_aborted; | |
663 | bytes_written_ = 0; | |
664 | return this->complete(cont, ec, bytes_written_); | |
665 | } | |
666 | auto& impl = *sp; | |
667 | using mutable_buffers_type = typename | |
668 | DynamicBuffer::mutable_buffers_type; | |
669 | BOOST_ASIO_CORO_REENTER(*this) | |
670 | { | |
671 | do | |
672 | { | |
673 | // VFALCO TODO use boost::beast::bind_continuation | |
674 | BOOST_ASIO_CORO_YIELD | |
675 | { | |
676 | auto mb = beast::detail::dynamic_buffer_prepare(b_, | |
677 | clamp(impl.read_size_hint_db(b_), limit_), | |
678 | ec, error::buffer_overflow); | |
679 | if(impl.check_stop_now(ec)) | |
680 | goto upcall; | |
681 | read_some_op<read_op, mutable_buffers_type>( | |
682 | std::move(*this), sp, *mb); | |
683 | } | |
684 | ||
685 | b_.commit(bytes_transferred); | |
686 | bytes_written_ += bytes_transferred; | |
687 | if(ec) | |
688 | goto upcall; | |
689 | } | |
690 | while(! some_ && ! impl.rd_done); | |
691 | ||
692 | upcall: | |
693 | this->complete(cont, ec, bytes_written_); | |
694 | } | |
695 | } | |
696 | }; | |
697 | ||
698 | template<class NextLayer, bool deflateSupported> | |
699 | struct stream<NextLayer, deflateSupported>:: | |
700 | run_read_some_op | |
701 | { | |
702 | template< | |
703 | class ReadHandler, | |
704 | class MutableBufferSequence> | |
705 | void | |
706 | operator()( | |
707 | ReadHandler&& h, | |
708 | boost::shared_ptr<impl_type> const& sp, | |
709 | MutableBufferSequence const& b) | |
710 | { | |
711 | // If you get an error on the following line it means | |
712 | // that your handler does not meet the documented type | |
713 | // requirements for the handler. | |
714 | ||
715 | static_assert( | |
716 | beast::detail::is_invocable<ReadHandler, | |
717 | void(error_code, std::size_t)>::value, | |
718 | "ReadHandler type requirements not met"); | |
719 | ||
720 | read_some_op< | |
721 | typename std::decay<ReadHandler>::type, | |
722 | MutableBufferSequence>( | |
723 | std::forward<ReadHandler>(h), | |
724 | sp, | |
725 | b); | |
726 | } | |
727 | }; | |
728 | ||
729 | template<class NextLayer, bool deflateSupported> | |
730 | struct stream<NextLayer, deflateSupported>:: | |
731 | run_read_op | |
732 | { | |
733 | template< | |
734 | class ReadHandler, | |
735 | class DynamicBuffer> | |
736 | void | |
737 | operator()( | |
738 | ReadHandler&& h, | |
739 | boost::shared_ptr<impl_type> const& sp, | |
740 | DynamicBuffer* b, | |
741 | std::size_t limit, | |
742 | bool some) | |
743 | { | |
744 | // If you get an error on the following line it means | |
745 | // that your handler does not meet the documented type | |
746 | // requirements for the handler. | |
747 | ||
748 | static_assert( | |
749 | beast::detail::is_invocable<ReadHandler, | |
750 | void(error_code, std::size_t)>::value, | |
751 | "ReadHandler type requirements not met"); | |
752 | ||
753 | read_op< | |
754 | typename std::decay<ReadHandler>::type, | |
755 | DynamicBuffer>( | |
756 | std::forward<ReadHandler>(h), | |
757 | sp, | |
758 | *b, | |
759 | limit, | |
760 | some); | |
761 | } | |
762 | }; | |
763 | ||
764 | //------------------------------------------------------------------------------ | |
765 | ||
766 | template<class NextLayer, bool deflateSupported> | |
767 | template<class DynamicBuffer> | |
768 | std::size_t | |
769 | stream<NextLayer, deflateSupported>:: | |
770 | read(DynamicBuffer& buffer) | |
771 | { | |
772 | static_assert(is_sync_stream<next_layer_type>::value, | |
773 | "SyncStream type requirements not met"); | |
774 | static_assert( | |
775 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
776 | "DynamicBuffer type requirements not met"); | |
777 | error_code ec; | |
778 | auto const bytes_written = read(buffer, ec); | |
779 | if(ec) | |
780 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
781 | return bytes_written; | |
782 | } | |
783 | ||
784 | template<class NextLayer, bool deflateSupported> | |
785 | template<class DynamicBuffer> | |
786 | std::size_t | |
787 | stream<NextLayer, deflateSupported>:: | |
788 | read(DynamicBuffer& buffer, error_code& ec) | |
789 | { | |
790 | static_assert(is_sync_stream<next_layer_type>::value, | |
791 | "SyncStream type requirements not met"); | |
792 | static_assert( | |
793 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
794 | "DynamicBuffer type requirements not met"); | |
795 | std::size_t bytes_written = 0; | |
796 | do | |
797 | { | |
798 | bytes_written += read_some(buffer, 0, ec); | |
799 | if(ec) | |
800 | return bytes_written; | |
801 | } | |
802 | while(! is_message_done()); | |
803 | return bytes_written; | |
804 | } | |
805 | ||
806 | template<class NextLayer, bool deflateSupported> | |
807 | template<class DynamicBuffer, class ReadHandler> | |
808 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) | |
809 | stream<NextLayer, deflateSupported>:: | |
810 | async_read(DynamicBuffer& buffer, ReadHandler&& handler) | |
811 | { | |
812 | static_assert(is_async_stream<next_layer_type>::value, | |
813 | "AsyncStream type requirements not met"); | |
814 | static_assert( | |
815 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
816 | "DynamicBuffer type requirements not met"); | |
817 | return net::async_initiate< | |
818 | ReadHandler, | |
819 | void(error_code, std::size_t)>( | |
820 | run_read_op{}, | |
821 | handler, | |
822 | impl_, | |
823 | &buffer, | |
824 | 0, | |
825 | false); | |
826 | } | |
827 | ||
828 | //------------------------------------------------------------------------------ | |
829 | ||
830 | template<class NextLayer, bool deflateSupported> | |
831 | template<class DynamicBuffer> | |
832 | std::size_t | |
833 | stream<NextLayer, deflateSupported>:: | |
834 | read_some( | |
835 | DynamicBuffer& buffer, | |
836 | std::size_t limit) | |
837 | { | |
838 | static_assert(is_sync_stream<next_layer_type>::value, | |
839 | "SyncStream type requirements not met"); | |
840 | static_assert( | |
841 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
842 | "DynamicBuffer type requirements not met"); | |
843 | error_code ec; | |
844 | auto const bytes_written = | |
845 | read_some(buffer, limit, ec); | |
846 | if(ec) | |
847 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
848 | return bytes_written; | |
849 | } | |
850 | ||
851 | template<class NextLayer, bool deflateSupported> | |
852 | template<class DynamicBuffer> | |
853 | std::size_t | |
854 | stream<NextLayer, deflateSupported>:: | |
855 | read_some( | |
856 | DynamicBuffer& buffer, | |
857 | std::size_t limit, | |
858 | error_code& ec) | |
859 | { | |
860 | static_assert(is_sync_stream<next_layer_type>::value, | |
861 | "SyncStream type requirements not met"); | |
862 | static_assert( | |
863 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
864 | "DynamicBuffer type requirements not met"); | |
865 | using beast::detail::clamp; | |
866 | if(! limit) | |
867 | limit = (std::numeric_limits<std::size_t>::max)(); | |
868 | auto const size = | |
869 | clamp(read_size_hint(buffer), limit); | |
870 | BOOST_ASSERT(size > 0); | |
871 | auto mb = beast::detail::dynamic_buffer_prepare( | |
872 | buffer, size, ec, error::buffer_overflow); | |
873 | if(impl_->check_stop_now(ec)) | |
874 | return 0; | |
875 | auto const bytes_written = read_some(*mb, ec); | |
876 | buffer.commit(bytes_written); | |
877 | return bytes_written; | |
878 | } | |
879 | ||
880 | template<class NextLayer, bool deflateSupported> | |
881 | template<class DynamicBuffer, class ReadHandler> | |
882 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) | |
883 | stream<NextLayer, deflateSupported>:: | |
884 | async_read_some( | |
885 | DynamicBuffer& buffer, | |
886 | std::size_t limit, | |
887 | ReadHandler&& handler) | |
888 | { | |
889 | static_assert(is_async_stream<next_layer_type>::value, | |
890 | "AsyncStream type requirements not met"); | |
891 | static_assert( | |
892 | net::is_dynamic_buffer<DynamicBuffer>::value, | |
893 | "DynamicBuffer type requirements not met"); | |
894 | return net::async_initiate< | |
895 | ReadHandler, | |
896 | void(error_code, std::size_t)>( | |
897 | run_read_op{}, | |
898 | handler, | |
899 | impl_, | |
900 | &buffer, | |
901 | limit, | |
902 | true); | |
903 | } | |
904 | ||
905 | //------------------------------------------------------------------------------ | |
906 | ||
907 | template<class NextLayer, bool deflateSupported> | |
908 | template<class MutableBufferSequence> | |
909 | std::size_t | |
910 | stream<NextLayer, deflateSupported>:: | |
911 | read_some( | |
912 | MutableBufferSequence const& buffers) | |
913 | { | |
914 | static_assert(is_sync_stream<next_layer_type>::value, | |
915 | "SyncStream type requirements not met"); | |
916 | static_assert(net::is_mutable_buffer_sequence< | |
917 | MutableBufferSequence>::value, | |
918 | "MutableBufferSequence type requirements not met"); | |
919 | error_code ec; | |
920 | auto const bytes_written = read_some(buffers, ec); | |
921 | if(ec) | |
922 | BOOST_THROW_EXCEPTION(system_error{ec}); | |
923 | return bytes_written; | |
924 | } | |
925 | ||
926 | template<class NextLayer, bool deflateSupported> | |
927 | template<class MutableBufferSequence> | |
928 | std::size_t | |
929 | stream<NextLayer, deflateSupported>:: | |
930 | read_some( | |
931 | MutableBufferSequence const& buffers, | |
932 | error_code& ec) | |
933 | { | |
934 | static_assert(is_sync_stream<next_layer_type>::value, | |
935 | "SyncStream type requirements not met"); | |
936 | static_assert(net::is_mutable_buffer_sequence< | |
937 | MutableBufferSequence>::value, | |
938 | "MutableBufferSequence type requirements not met"); | |
939 | using beast::detail::clamp; | |
940 | auto& impl = *impl_; | |
941 | close_code code{}; | |
942 | std::size_t bytes_written = 0; | |
943 | ec = {}; | |
944 | // Make sure the stream is open | |
945 | if(impl.check_stop_now(ec)) | |
946 | return bytes_written; | |
947 | loop: | |
948 | // See if we need to read a frame header. This | |
949 | // condition is structured to give the decompressor | |
950 | // a chance to emit the final empty deflate block | |
951 | // | |
952 | if(impl.rd_remain == 0 && ( | |
953 | ! impl.rd_fh.fin || impl.rd_done)) | |
954 | { | |
955 | // Read frame header | |
956 | error_code result; | |
957 | while(! impl.parse_fh(impl.rd_fh, impl.rd_buf, result)) | |
958 | { | |
959 | if(result) | |
960 | { | |
961 | // _Fail the WebSocket Connection_ | |
962 | if(result == error::message_too_big) | |
963 | code = close_code::too_big; | |
964 | else | |
965 | code = close_code::protocol_error; | |
966 | do_fail(code, result, ec); | |
967 | return bytes_written; | |
968 | } | |
969 | auto const bytes_transferred = | |
970 | impl.stream().read_some( | |
971 | impl.rd_buf.prepare(read_size( | |
972 | impl.rd_buf, impl.rd_buf.max_size())), | |
973 | ec); | |
974 | impl.rd_buf.commit(bytes_transferred); | |
975 | if(impl.check_stop_now(ec)) | |
976 | return bytes_written; | |
977 | } | |
978 | // Immediately apply the mask to the portion | |
979 | // of the buffer holding payload data. | |
980 | if(impl.rd_fh.len > 0 && impl.rd_fh.mask) | |
981 | detail::mask_inplace(buffers_prefix( | |
982 | clamp(impl.rd_fh.len), impl.rd_buf.data()), | |
983 | impl.rd_key); | |
984 | if(detail::is_control(impl.rd_fh.op)) | |
985 | { | |
986 | // Get control frame payload | |
987 | auto const b = buffers_prefix( | |
988 | clamp(impl.rd_fh.len), impl.rd_buf.data()); | |
989 | auto const len = buffer_bytes(b); | |
990 | BOOST_ASSERT(len == impl.rd_fh.len); | |
991 | ||
992 | // Clear this otherwise the next | |
993 | // frame will be considered final. | |
994 | impl.rd_fh.fin = false; | |
995 | ||
996 | // Handle ping frame | |
997 | if(impl.rd_fh.op == detail::opcode::ping) | |
998 | { | |
999 | ping_data payload; | |
1000 | detail::read_ping(payload, b); | |
1001 | impl.rd_buf.consume(len); | |
1002 | if(impl.wr_close) | |
1003 | { | |
1004 | // Ignore ping when closing | |
1005 | goto loop; | |
1006 | } | |
1007 | if(impl.ctrl_cb) | |
1008 | impl.ctrl_cb(frame_type::ping, payload); | |
1009 | detail::frame_buffer fb; | |
1010 | impl.template write_ping<flat_static_buffer_base>(fb, | |
1011 | detail::opcode::pong, payload); | |
1012 | net::write(impl.stream(), fb.data(), ec); | |
1013 | if(impl.check_stop_now(ec)) | |
1014 | return bytes_written; | |
1015 | goto loop; | |
1016 | } | |
1017 | // Handle pong frame | |
1018 | if(impl.rd_fh.op == detail::opcode::pong) | |
1019 | { | |
1020 | ping_data payload; | |
1021 | detail::read_ping(payload, b); | |
1022 | impl.rd_buf.consume(len); | |
1023 | if(impl.ctrl_cb) | |
1024 | impl.ctrl_cb(frame_type::pong, payload); | |
1025 | goto loop; | |
1026 | } | |
1027 | // Handle close frame | |
1028 | BOOST_ASSERT(impl.rd_fh.op == detail::opcode::close); | |
1029 | { | |
1030 | BOOST_ASSERT(! impl.rd_close); | |
1031 | impl.rd_close = true; | |
1032 | close_reason cr; | |
1033 | detail::read_close(cr, b, result); | |
1034 | if(result) | |
1035 | { | |
1036 | // _Fail the WebSocket Connection_ | |
1037 | do_fail(close_code::protocol_error, | |
1038 | result, ec); | |
1039 | return bytes_written; | |
1040 | } | |
1041 | impl.cr = cr; | |
1042 | impl.rd_buf.consume(len); | |
1043 | if(impl.ctrl_cb) | |
1044 | impl.ctrl_cb(frame_type::close, impl.cr.reason); | |
1045 | BOOST_ASSERT(! impl.wr_close); | |
1046 | // _Start the WebSocket Closing Handshake_ | |
1047 | do_fail( | |
1048 | cr.code == close_code::none ? | |
1049 | close_code::normal : | |
1050 | static_cast<close_code>(cr.code), | |
1051 | error::closed, ec); | |
1052 | return bytes_written; | |
1053 | } | |
1054 | } | |
1055 | if(impl.rd_fh.len == 0 && ! impl.rd_fh.fin) | |
1056 | { | |
1057 | // Empty non-final frame | |
1058 | goto loop; | |
1059 | } | |
1060 | impl.rd_done = false; | |
1061 | } | |
1062 | else | |
1063 | { | |
1064 | ec = {}; | |
1065 | } | |
1066 | if(! impl.rd_deflated()) | |
1067 | { | |
1068 | if(impl.rd_remain > 0) | |
1069 | { | |
1070 | if(impl.rd_buf.size() == 0 && impl.rd_buf.max_size() > | |
1071 | (std::min)(clamp(impl.rd_remain), | |
1072 | buffer_bytes(buffers))) | |
1073 | { | |
1074 | // Fill the read buffer first, otherwise we | |
1075 | // get fewer bytes at the cost of one I/O. | |
1076 | impl.rd_buf.commit(impl.stream().read_some( | |
1077 | impl.rd_buf.prepare(read_size(impl.rd_buf, | |
1078 | impl.rd_buf.max_size())), ec)); | |
1079 | if(impl.check_stop_now(ec)) | |
1080 | return bytes_written; | |
1081 | if(impl.rd_fh.mask) | |
1082 | detail::mask_inplace( | |
1083 | buffers_prefix(clamp(impl.rd_remain), | |
1084 | impl.rd_buf.data()), impl.rd_key); | |
1085 | } | |
1086 | if(impl.rd_buf.size() > 0) | |
1087 | { | |
1088 | // Copy from the read buffer. | |
1089 | // The mask was already applied. | |
1090 | auto const bytes_transferred = net::buffer_copy( | |
1091 | buffers, impl.rd_buf.data(), | |
1092 | clamp(impl.rd_remain)); | |
1093 | auto const mb = buffers_prefix( | |
1094 | bytes_transferred, buffers); | |
1095 | impl.rd_remain -= bytes_transferred; | |
1096 | if(impl.rd_op == detail::opcode::text) | |
1097 | { | |
1098 | if(! impl.rd_utf8.write(mb) || | |
1099 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
1100 | ! impl.rd_utf8.finish())) | |
1101 | { | |
1102 | // _Fail the WebSocket Connection_ | |
1103 | do_fail(close_code::bad_payload, | |
1104 | error::bad_frame_payload, ec); | |
1105 | return bytes_written; | |
1106 | } | |
1107 | } | |
1108 | bytes_written += bytes_transferred; | |
1109 | impl.rd_size += bytes_transferred; | |
1110 | impl.rd_buf.consume(bytes_transferred); | |
1111 | } | |
1112 | else | |
1113 | { | |
1114 | // Read into caller's buffer | |
1115 | BOOST_ASSERT(impl.rd_remain > 0); | |
1116 | BOOST_ASSERT(buffer_bytes(buffers) > 0); | |
1117 | BOOST_ASSERT(buffer_bytes(buffers_prefix( | |
1118 | clamp(impl.rd_remain), buffers)) > 0); | |
1119 | auto const bytes_transferred = | |
1120 | impl.stream().read_some(buffers_prefix( | |
1121 | clamp(impl.rd_remain), buffers), ec); | |
1122 | // VFALCO What if some bytes were written? | |
1123 | if(impl.check_stop_now(ec)) | |
1124 | return bytes_written; | |
1125 | BOOST_ASSERT(bytes_transferred > 0); | |
1126 | auto const mb = buffers_prefix( | |
1127 | bytes_transferred, buffers); | |
1128 | impl.rd_remain -= bytes_transferred; | |
1129 | if(impl.rd_fh.mask) | |
1130 | detail::mask_inplace(mb, impl.rd_key); | |
1131 | if(impl.rd_op == detail::opcode::text) | |
1132 | { | |
1133 | if(! impl.rd_utf8.write(mb) || | |
1134 | (impl.rd_remain == 0 && impl.rd_fh.fin && | |
1135 | ! impl.rd_utf8.finish())) | |
1136 | { | |
1137 | // _Fail the WebSocket Connection_ | |
1138 | do_fail(close_code::bad_payload, | |
1139 | error::bad_frame_payload, ec); | |
1140 | return bytes_written; | |
1141 | } | |
1142 | } | |
1143 | bytes_written += bytes_transferred; | |
1144 | impl.rd_size += bytes_transferred; | |
1145 | } | |
1146 | } | |
f67539c2 TL |
1147 | BOOST_ASSERT( ! impl.rd_done ); |
1148 | if( impl.rd_remain == 0 && impl.rd_fh.fin ) | |
1149 | impl.rd_done = true; | |
92f5a8d4 TL |
1150 | } |
1151 | else | |
1152 | { | |
1153 | // Read compressed message frame payload: | |
1154 | // inflate even if rd_fh_.len == 0, otherwise we | |
1155 | // never emit the end-of-stream deflate block. | |
1156 | // | |
1157 | bool did_read = false; | |
1158 | buffers_suffix<MutableBufferSequence> cb(buffers); | |
1159 | while(buffer_bytes(cb) > 0) | |
1160 | { | |
1161 | zlib::z_params zs; | |
1162 | { | |
1163 | auto const out = beast::buffers_front(cb); | |
1164 | zs.next_out = out.data(); | |
1165 | zs.avail_out = out.size(); | |
1166 | BOOST_ASSERT(zs.avail_out > 0); | |
1167 | } | |
1168 | if(impl.rd_remain > 0) | |
1169 | { | |
1170 | if(impl.rd_buf.size() > 0) | |
1171 | { | |
1172 | // use what's there | |
1173 | auto const in = buffers_prefix( | |
1174 | clamp(impl.rd_remain), beast::buffers_front( | |
1175 | impl.rd_buf.data())); | |
1176 | zs.avail_in = in.size(); | |
1177 | zs.next_in = in.data(); | |
1178 | } | |
1179 | else if(! did_read) | |
1180 | { | |
1181 | // read new | |
1182 | auto const bytes_transferred = | |
1183 | impl.stream().read_some( | |
1184 | impl.rd_buf.prepare(read_size( | |
1185 | impl.rd_buf, impl.rd_buf.max_size())), | |
1186 | ec); | |
1187 | if(impl.check_stop_now(ec)) | |
1188 | return bytes_written; | |
1189 | BOOST_ASSERT(bytes_transferred > 0); | |
1190 | impl.rd_buf.commit(bytes_transferred); | |
1191 | if(impl.rd_fh.mask) | |
1192 | detail::mask_inplace( | |
1193 | buffers_prefix(clamp(impl.rd_remain), | |
1194 | impl.rd_buf.data()), impl.rd_key); | |
1195 | auto const in = buffers_prefix( | |
1196 | clamp(impl.rd_remain), buffers_front( | |
1197 | impl.rd_buf.data())); | |
1198 | zs.avail_in = in.size(); | |
1199 | zs.next_in = in.data(); | |
1200 | did_read = true; | |
1201 | } | |
1202 | else | |
1203 | { | |
1204 | break; | |
1205 | } | |
1206 | } | |
1207 | else if(impl.rd_fh.fin) | |
1208 | { | |
1209 | // append the empty block codes | |
1210 | static std::uint8_t constexpr | |
1211 | empty_block[4] = { | |
1212 | 0x00, 0x00, 0xff, 0xff }; | |
1213 | zs.next_in = empty_block; | |
1214 | zs.avail_in = sizeof(empty_block); | |
1215 | impl.inflate(zs, zlib::Flush::sync, ec); | |
1216 | if(! ec) | |
1217 | { | |
1218 | // https://github.com/madler/zlib/issues/280 | |
1219 | if(zs.total_out > 0) | |
1220 | ec = error::partial_deflate_block; | |
1221 | } | |
1222 | if(impl.check_stop_now(ec)) | |
1223 | return bytes_written; | |
1224 | impl.do_context_takeover_read(impl.role); | |
1225 | impl.rd_done = true; | |
1226 | break; | |
1227 | } | |
1228 | else | |
1229 | { | |
1230 | break; | |
1231 | } | |
1232 | impl.inflate(zs, zlib::Flush::sync, ec); | |
1233 | if(impl.check_stop_now(ec)) | |
1234 | return bytes_written; | |
1235 | if(impl.rd_msg_max && beast::detail::sum_exceeds( | |
1236 | impl.rd_size, zs.total_out, impl.rd_msg_max)) | |
1237 | { | |
1238 | do_fail(close_code::too_big, | |
1239 | error::message_too_big, ec); | |
1240 | return bytes_written; | |
1241 | } | |
1242 | cb.consume(zs.total_out); | |
1243 | impl.rd_size += zs.total_out; | |
1244 | impl.rd_remain -= zs.total_in; | |
1245 | impl.rd_buf.consume(zs.total_in); | |
1246 | bytes_written += zs.total_out; | |
1247 | } | |
1248 | if(impl.rd_op == detail::opcode::text) | |
1249 | { | |
1250 | // check utf8 | |
1251 | if(! impl.rd_utf8.write(beast::buffers_prefix( | |
1252 | bytes_written, buffers)) || ( | |
1253 | impl.rd_done && ! impl.rd_utf8.finish())) | |
1254 | { | |
1255 | // _Fail the WebSocket Connection_ | |
1256 | do_fail(close_code::bad_payload, | |
1257 | error::bad_frame_payload, ec); | |
1258 | return bytes_written; | |
1259 | } | |
1260 | } | |
1261 | } | |
1262 | return bytes_written; | |
1263 | } | |
1264 | ||
1265 | template<class NextLayer, bool deflateSupported> | |
1266 | template<class MutableBufferSequence, class ReadHandler> | |
1267 | BOOST_BEAST_ASYNC_RESULT2(ReadHandler) | |
1268 | stream<NextLayer, deflateSupported>:: | |
1269 | async_read_some( | |
1270 | MutableBufferSequence const& buffers, | |
1271 | ReadHandler&& handler) | |
1272 | { | |
1273 | static_assert(is_async_stream<next_layer_type>::value, | |
1274 | "AsyncStream type requirements not met"); | |
1275 | static_assert(net::is_mutable_buffer_sequence< | |
1276 | MutableBufferSequence>::value, | |
1277 | "MutableBufferSequence type requirements not met"); | |
1278 | return net::async_initiate< | |
1279 | ReadHandler, | |
1280 | void(error_code, std::size_t)>( | |
1281 | run_read_some_op{}, | |
1282 | handler, | |
1283 | impl_, | |
1284 | buffers); | |
1285 | } | |
1286 | ||
1287 | } // websocket | |
1288 | } // beast | |
1289 | } // boost | |
1290 | ||
1291 | #endif |