]> git.proxmox.com Git - ceph.git/blob - ceph/src/Beast/include/beast/websocket/impl/write.ipp
bump version to 12.2.2-pve1
[ceph.git] / ceph / src / Beast / include / beast / websocket / impl / write.ipp
1 //
2 // Copyright (c) 2013-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #ifndef BEAST_WEBSOCKET_IMPL_WRITE_IPP
9 #define BEAST_WEBSOCKET_IMPL_WRITE_IPP
10
11 #include <beast/core/bind_handler.hpp>
12 #include <beast/core/buffer_cat.hpp>
13 #include <beast/core/buffer_concepts.hpp>
14 #include <beast/core/consuming_buffers.hpp>
15 #include <beast/core/handler_helpers.hpp>
16 #include <beast/core/handler_ptr.hpp>
17 #include <beast/core/prepare_buffers.hpp>
18 #include <beast/core/static_streambuf.hpp>
19 #include <beast/core/stream_concepts.hpp>
20 #include <beast/core/detail/clamp.hpp>
21 #include <beast/websocket/detail/frame.hpp>
22 #include <boost/assert.hpp>
23 #include <algorithm>
24 #include <memory>
25
26 namespace beast {
27 namespace websocket {
28
29 template<class NextLayer>
30 template<class Buffers, class Handler>
31 class stream<NextLayer>::write_frame_op
32 {
33 struct data : op
34 {
35 Handler& handler;
36 bool cont;
37 stream<NextLayer>& ws;
38 consuming_buffers<Buffers> cb;
39 bool fin;
40 detail::frame_header fh;
41 detail::fh_streambuf fh_buf;
42 detail::prepared_key key;
43 std::uint64_t remain;
44 int state = 0;
45 int entry_state;
46
47 data(Handler& handler_, stream<NextLayer>& ws_,
48 bool fin_, Buffers const& bs)
49 : handler(handler_)
50 , cont(beast_asio_helpers::
51 is_continuation(handler))
52 , ws(ws_)
53 , cb(bs)
54 , fin(fin_)
55 {
56 }
57 };
58
59 handler_ptr<data, Handler> d_;
60
61 public:
62 write_frame_op(write_frame_op&&) = default;
63 write_frame_op(write_frame_op const&) = default;
64
65 template<class DeducedHandler, class... Args>
66 write_frame_op(DeducedHandler&& h,
67 stream<NextLayer>& ws, Args&&... args)
68 : d_(std::forward<DeducedHandler>(h),
69 ws, std::forward<Args>(args)...)
70 {
71 (*this)(error_code{}, 0, false);
72 }
73
74 void operator()()
75 {
76 (*this)(error_code{}, 0, true);
77 }
78
79 void operator()(error_code const& ec)
80 {
81 (*this)(ec, 0, true);
82 }
83
84 void operator()(error_code ec,
85 std::size_t bytes_transferred);
86
87 void operator()(error_code ec,
88 std::size_t bytes_transferred, bool again);
89
90 friend
91 void* asio_handler_allocate(
92 std::size_t size, write_frame_op* op)
93 {
94 return beast_asio_helpers::
95 allocate(size, op->d_.handler());
96 }
97
98 friend
99 void asio_handler_deallocate(
100 void* p, std::size_t size, write_frame_op* op)
101 {
102 return beast_asio_helpers::
103 deallocate(p, size, op->d_.handler());
104 }
105
106 friend
107 bool asio_handler_is_continuation(write_frame_op* op)
108 {
109 return op->d_->cont;
110 }
111
112 template<class Function>
113 friend
114 void asio_handler_invoke(Function&& f, write_frame_op* op)
115 {
116 return beast_asio_helpers::
117 invoke(f, op->d_.handler());
118 }
119 };
120
121 template<class NextLayer>
122 template<class Buffers, class Handler>
123 void
124 stream<NextLayer>::
125 write_frame_op<Buffers, Handler>::
126 operator()(error_code ec, std::size_t bytes_transferred)
127 {
128 auto& d = *d_;
129 if(ec)
130 d.ws.failed_ = true;
131 (*this)(ec, bytes_transferred, true);
132 }
133
134 template<class NextLayer>
135 template<class Buffers, class Handler>
136 void
137 stream<NextLayer>::
138 write_frame_op<Buffers, Handler>::
139 operator()(error_code ec,
140 std::size_t bytes_transferred, bool again)
141 {
142 using beast::detail::clamp;
143 using boost::asio::buffer;
144 using boost::asio::buffer_copy;
145 using boost::asio::buffer_size;
146 enum
147 {
148 do_init = 0,
149 do_nomask_nofrag = 20,
150 do_nomask_frag = 30,
151 do_mask_nofrag = 40,
152 do_mask_frag = 50,
153 do_deflate = 60,
154 do_maybe_suspend = 80,
155 do_upcall = 99
156 };
157 auto& d = *d_;
158 d.cont = d.cont || again;
159 if(ec)
160 goto upcall;
161 for(;;)
162 {
163 switch(d.state)
164 {
165 case do_init:
166 if(! d.ws.wr_.cont)
167 {
168 d.ws.wr_begin();
169 d.fh.rsv1 = d.ws.wr_.compress;
170 }
171 else
172 {
173 d.fh.rsv1 = false;
174 }
175 d.fh.rsv2 = false;
176 d.fh.rsv3 = false;
177 d.fh.op = d.ws.wr_.cont ?
178 opcode::cont : d.ws.wr_opcode_;
179 d.fh.mask =
180 d.ws.role_ == detail::role_type::client;
181
182 // entry_state determines which algorithm
183 // we will use to send. If we suspend, we
184 // will transition to entry_state + 1 on
185 // the resume.
186 if(d.ws.wr_.compress)
187 {
188 d.entry_state = do_deflate;
189 }
190 else if(! d.fh.mask)
191 {
192 if(! d.ws.wr_.autofrag)
193 {
194 d.entry_state = do_nomask_nofrag;
195 }
196 else
197 {
198 BOOST_ASSERT(d.ws.wr_.buf_size != 0);
199 d.remain = buffer_size(d.cb);
200 if(d.remain > d.ws.wr_.buf_size)
201 d.entry_state = do_nomask_frag;
202 else
203 d.entry_state = do_nomask_nofrag;
204 }
205 }
206 else
207 {
208 if(! d.ws.wr_.autofrag)
209 {
210 d.entry_state = do_mask_nofrag;
211 }
212 else
213 {
214 BOOST_ASSERT(d.ws.wr_.buf_size != 0);
215 d.remain = buffer_size(d.cb);
216 if(d.remain > d.ws.wr_.buf_size)
217 d.entry_state = do_mask_frag;
218 else
219 d.entry_state = do_mask_nofrag;
220 }
221 }
222 d.state = do_maybe_suspend;
223 break;
224
225 //----------------------------------------------------------------------
226
227 case do_nomask_nofrag:
228 BOOST_ASSERT(! d.ws.wr_block_);
229 d.ws.wr_block_ = &d;
230 // [[fallthrough]]
231
232 case do_nomask_nofrag + 1:
233 {
234 BOOST_ASSERT(d.ws.wr_block_ == &d);
235 d.fh.fin = d.fin;
236 d.fh.len = buffer_size(d.cb);
237 detail::write<static_streambuf>(
238 d.fh_buf, d.fh);
239 d.ws.wr_.cont = ! d.fin;
240 // Send frame
241 d.state = do_upcall;
242 boost::asio::async_write(d.ws.stream_,
243 buffer_cat(d.fh_buf.data(), d.cb),
244 std::move(*this));
245 return;
246 }
247
248 //----------------------------------------------------------------------
249
250 case do_nomask_frag:
251 BOOST_ASSERT(! d.ws.wr_block_);
252 d.ws.wr_block_ = &d;
253 // [[fallthrough]]
254
255 case do_nomask_frag + 1:
256 {
257 BOOST_ASSERT(d.ws.wr_block_ == &d);
258 auto const n = clamp(
259 d.remain, d.ws.wr_.buf_size);
260 d.remain -= n;
261 d.fh.len = n;
262 d.fh.fin = d.fin ? d.remain == 0 : false;
263 detail::write<static_streambuf>(
264 d.fh_buf, d.fh);
265 d.ws.wr_.cont = ! d.fin;
266 // Send frame
267 d.state = d.remain == 0 ?
268 do_upcall : do_nomask_frag + 2;
269 boost::asio::async_write(d.ws.stream_,
270 buffer_cat(d.fh_buf.data(),
271 prepare_buffers(n, d.cb)),
272 std::move(*this));
273 return;
274 }
275
276 case do_nomask_frag + 2:
277 d.cb.consume(
278 bytes_transferred - d.fh_buf.size());
279 d.fh_buf.reset();
280 d.fh.op = opcode::cont;
281 if(d.ws.wr_block_ == &d)
282 d.ws.wr_block_ = nullptr;
283 // Allow outgoing control frames to
284 // be sent in between message frames:
285 if(d.ws.rd_op_.maybe_invoke() ||
286 d.ws.ping_op_.maybe_invoke())
287 {
288 d.state = do_maybe_suspend;
289 d.ws.get_io_service().post(
290 std::move(*this));
291 return;
292 }
293 d.state = d.entry_state;
294 break;
295
296 //----------------------------------------------------------------------
297
298 case do_mask_nofrag:
299 BOOST_ASSERT(! d.ws.wr_block_);
300 d.ws.wr_block_ = &d;
301 // [[fallthrough]]
302
303 case do_mask_nofrag + 1:
304 {
305 BOOST_ASSERT(d.ws.wr_block_ == &d);
306 d.remain = buffer_size(d.cb);
307 d.fh.fin = d.fin;
308 d.fh.len = d.remain;
309 d.fh.key = d.ws.maskgen_();
310 detail::prepare_key(d.key, d.fh.key);
311 detail::write<static_streambuf>(
312 d.fh_buf, d.fh);
313 auto const n =
314 clamp(d.remain, d.ws.wr_.buf_size);
315 auto const b =
316 buffer(d.ws.wr_.buf.get(), n);
317 buffer_copy(b, d.cb);
318 detail::mask_inplace(b, d.key);
319 d.remain -= n;
320 d.ws.wr_.cont = ! d.fin;
321 // Send frame header and partial payload
322 d.state = d.remain == 0 ?
323 do_upcall : do_mask_nofrag + 2;
324 boost::asio::async_write(d.ws.stream_,
325 buffer_cat(d.fh_buf.data(), b),
326 std::move(*this));
327 return;
328 }
329
330 case do_mask_nofrag + 2:
331 {
332 d.cb.consume(d.ws.wr_.buf_size);
333 auto const n =
334 clamp(d.remain, d.ws.wr_.buf_size);
335 auto const b =
336 buffer(d.ws.wr_.buf.get(), n);
337 buffer_copy(b, d.cb);
338 detail::mask_inplace(b, d.key);
339 d.remain -= n;
340 // Send parial payload
341 if(d.remain == 0)
342 d.state = do_upcall;
343 boost::asio::async_write(
344 d.ws.stream_, b, std::move(*this));
345 return;
346 }
347
348 //----------------------------------------------------------------------
349
350 case do_mask_frag:
351 BOOST_ASSERT(! d.ws.wr_block_);
352 d.ws.wr_block_ = &d;
353 // [[fallthrough]]
354
355 case do_mask_frag + 1:
356 {
357 BOOST_ASSERT(d.ws.wr_block_ == &d);
358 auto const n = clamp(
359 d.remain, d.ws.wr_.buf_size);
360 d.remain -= n;
361 d.fh.len = n;
362 d.fh.key = d.ws.maskgen_();
363 d.fh.fin = d.fin ? d.remain == 0 : false;
364 detail::prepare_key(d.key, d.fh.key);
365 auto const b = buffer(
366 d.ws.wr_.buf.get(), n);
367 buffer_copy(b, d.cb);
368 detail::mask_inplace(b, d.key);
369 detail::write<static_streambuf>(
370 d.fh_buf, d.fh);
371 d.ws.wr_.cont = ! d.fin;
372 // Send frame
373 d.state = d.remain == 0 ?
374 do_upcall : do_mask_frag + 2;
375 boost::asio::async_write(d.ws.stream_,
376 buffer_cat(d.fh_buf.data(), b),
377 std::move(*this));
378 return;
379 }
380
381 case do_mask_frag + 2:
382 d.cb.consume(
383 bytes_transferred - d.fh_buf.size());
384 d.fh_buf.reset();
385 d.fh.op = opcode::cont;
386 BOOST_ASSERT(d.ws.wr_block_ == &d);
387 d.ws.wr_block_ = nullptr;
388 // Allow outgoing control frames to
389 // be sent in between message frames:
390 if(d.ws.rd_op_.maybe_invoke() ||
391 d.ws.ping_op_.maybe_invoke())
392 {
393 d.state = do_maybe_suspend;
394 d.ws.get_io_service().post(
395 std::move(*this));
396 return;
397 }
398 d.state = d.entry_state;
399 break;
400
401 //----------------------------------------------------------------------
402
403 case do_deflate:
404 BOOST_ASSERT(! d.ws.wr_block_);
405 d.ws.wr_block_ = &d;
406 // [[fallthrough]]
407
408 case do_deflate + 1:
409 {
410 BOOST_ASSERT(d.ws.wr_block_ == &d);
411 auto b = buffer(d.ws.wr_.buf.get(),
412 d.ws.wr_.buf_size);
413 auto const more = detail::deflate(
414 d.ws.pmd_->zo, b, d.cb, d.fin, ec);
415 d.ws.failed_ = ec != 0;
416 if(d.ws.failed_)
417 goto upcall;
418 auto const n = buffer_size(b);
419 if(n == 0)
420 {
421 // The input was consumed, but there
422 // is no output due to compression
423 // latency.
424 BOOST_ASSERT(! d.fin);
425 BOOST_ASSERT(buffer_size(d.cb) == 0);
426
427 // We can skip the dispatch if the
428 // asynchronous initiation function is
429 // not on call stack but its hard to
430 // figure out so be safe and dispatch.
431 d.state = do_upcall;
432 d.ws.get_io_service().post(std::move(*this));
433 return;
434 }
435 if(d.fh.mask)
436 {
437 d.fh.key = d.ws.maskgen_();
438 detail::prepared_key key;
439 detail::prepare_key(key, d.fh.key);
440 detail::mask_inplace(b, key);
441 }
442 d.fh.fin = ! more;
443 d.fh.len = n;
444 detail::fh_streambuf fh_buf;
445 detail::write<static_streambuf>(fh_buf, d.fh);
446 d.ws.wr_.cont = ! d.fin;
447 // Send frame
448 d.state = more ?
449 do_deflate + 2 : do_deflate + 3;
450 boost::asio::async_write(d.ws.stream_,
451 buffer_cat(fh_buf.data(), b),
452 std::move(*this));
453 return;
454 }
455
456 case do_deflate + 2:
457 d.fh.op = opcode::cont;
458 d.fh.rsv1 = false;
459 BOOST_ASSERT(d.ws.wr_block_ == &d);
460 d.ws.wr_block_ = nullptr;
461 // Allow outgoing control frames to
462 // be sent in between message frames:
463 if(d.ws.rd_op_.maybe_invoke() ||
464 d.ws.ping_op_.maybe_invoke())
465 {
466 d.state = do_maybe_suspend;
467 d.ws.get_io_service().post(
468 std::move(*this));
469 return;
470 }
471 d.state = d.entry_state;
472 break;
473
474 case do_deflate + 3:
475 if(d.fh.fin && (
476 (d.ws.role_ == detail::role_type::client &&
477 d.ws.pmd_config_.client_no_context_takeover) ||
478 (d.ws.role_ == detail::role_type::server &&
479 d.ws.pmd_config_.server_no_context_takeover)))
480 d.ws.pmd_->zo.reset();
481 goto upcall;
482
483 //----------------------------------------------------------------------
484
485 case do_maybe_suspend:
486 {
487 if(d.ws.wr_block_)
488 {
489 // suspend
490 d.state = do_maybe_suspend + 1;
491 d.ws.wr_op_.template emplace<
492 write_frame_op>(std::move(*this));
493 return;
494 }
495 if(d.ws.failed_ || d.ws.wr_close_)
496 {
497 // call handler
498 d.state = do_upcall;
499 d.ws.get_io_service().post(
500 bind_handler(std::move(*this),
501 boost::asio::error::operation_aborted));
502 return;
503 }
504 d.state = d.entry_state;
505 break;
506 }
507
508 case do_maybe_suspend + 1:
509 BOOST_ASSERT(! d.ws.wr_block_);
510 d.ws.wr_block_ = &d;
511 d.state = do_maybe_suspend + 2;
512 // The current context is safe but might not be
513 // the same as the one for this operation (since
514 // we are being called from a write operation).
515 // Call post to make sure we are invoked the same
516 // way as the final handler for this operation.
517 d.ws.get_io_service().post(bind_handler(
518 std::move(*this), ec));
519 return;
520
521 case do_maybe_suspend + 2:
522 BOOST_ASSERT(d.ws.wr_block_ == &d);
523 if(d.ws.failed_ || d.ws.wr_close_)
524 {
525 // call handler
526 ec = boost::asio::error::operation_aborted;
527 goto upcall;
528 }
529 d.state = d.entry_state + 1;
530 break;
531
532 //----------------------------------------------------------------------
533
534 case do_upcall:
535 goto upcall;
536 }
537 }
538 upcall:
539 if(d.ws.wr_block_ == &d)
540 d.ws.wr_block_ = nullptr;
541 d.ws.rd_op_.maybe_invoke() ||
542 d.ws.ping_op_.maybe_invoke();
543 d_.invoke(ec);
544 }
545
546 template<class NextLayer>
547 template<class ConstBufferSequence, class WriteHandler>
548 typename async_completion<
549 WriteHandler, void(error_code)>::result_type
550 stream<NextLayer>::
551 async_write_frame(bool fin,
552 ConstBufferSequence const& bs, WriteHandler&& handler)
553 {
554 static_assert(is_AsyncStream<next_layer_type>::value,
555 "AsyncStream requirements not met");
556 static_assert(beast::is_ConstBufferSequence<
557 ConstBufferSequence>::value,
558 "ConstBufferSequence requirements not met");
559 beast::async_completion<
560 WriteHandler, void(error_code)
561 > completion{handler};
562 write_frame_op<ConstBufferSequence, decltype(
563 completion.handler)>{completion.handler,
564 *this, fin, bs};
565 return completion.result.get();
566 }
567
568 template<class NextLayer>
569 template<class ConstBufferSequence>
570 void
571 stream<NextLayer>::
572 write_frame(bool fin, ConstBufferSequence const& buffers)
573 {
574 static_assert(is_SyncStream<next_layer_type>::value,
575 "SyncStream requirements not met");
576 static_assert(beast::is_ConstBufferSequence<
577 ConstBufferSequence>::value,
578 "ConstBufferSequence requirements not met");
579 error_code ec;
580 write_frame(fin, buffers, ec);
581 if(ec)
582 throw system_error{ec};
583 }
584
585 template<class NextLayer>
586 template<class ConstBufferSequence>
587 void
588 stream<NextLayer>::
589 write_frame(bool fin,
590 ConstBufferSequence const& buffers, error_code& ec)
591 {
592 static_assert(is_SyncStream<next_layer_type>::value,
593 "SyncStream requirements not met");
594 static_assert(beast::is_ConstBufferSequence<
595 ConstBufferSequence>::value,
596 "ConstBufferSequence requirements not met");
597 using beast::detail::clamp;
598 using boost::asio::buffer;
599 using boost::asio::buffer_copy;
600 using boost::asio::buffer_size;
601 detail::frame_header fh;
602 if(! wr_.cont)
603 {
604 wr_begin();
605 fh.rsv1 = wr_.compress;
606 }
607 else
608 {
609 fh.rsv1 = false;
610 }
611 fh.rsv2 = false;
612 fh.rsv3 = false;
613 fh.op = wr_.cont ? opcode::cont : wr_opcode_;
614 fh.mask = role_ == detail::role_type::client;
615 auto remain = buffer_size(buffers);
616 if(wr_.compress)
617 {
618 consuming_buffers<
619 ConstBufferSequence> cb{buffers};
620 for(;;)
621 {
622 auto b = buffer(
623 wr_.buf.get(), wr_.buf_size);
624 auto const more = detail::deflate(
625 pmd_->zo, b, cb, fin, ec);
626 failed_ = ec != 0;
627 if(failed_)
628 return;
629 auto const n = buffer_size(b);
630 if(n == 0)
631 {
632 // The input was consumed, but there
633 // is no output due to compression
634 // latency.
635 BOOST_ASSERT(! fin);
636 BOOST_ASSERT(buffer_size(cb) == 0);
637 fh.fin = false;
638 break;
639 }
640 if(fh.mask)
641 {
642 fh.key = maskgen_();
643 detail::prepared_key key;
644 detail::prepare_key(key, fh.key);
645 detail::mask_inplace(b, key);
646 }
647 fh.fin = ! more;
648 fh.len = n;
649 detail::fh_streambuf fh_buf;
650 detail::write<static_streambuf>(fh_buf, fh);
651 wr_.cont = ! fin;
652 boost::asio::write(stream_,
653 buffer_cat(fh_buf.data(), b), ec);
654 failed_ = ec != 0;
655 if(failed_)
656 return;
657 if(! more)
658 break;
659 fh.op = opcode::cont;
660 fh.rsv1 = false;
661 }
662 if(fh.fin && (
663 (role_ == detail::role_type::client &&
664 pmd_config_.client_no_context_takeover) ||
665 (role_ == detail::role_type::server &&
666 pmd_config_.server_no_context_takeover)))
667 pmd_->zo.reset();
668 return;
669 }
670 if(! fh.mask)
671 {
672 if(! wr_.autofrag)
673 {
674 // no mask, no autofrag
675 fh.fin = fin;
676 fh.len = remain;
677 detail::fh_streambuf fh_buf;
678 detail::write<static_streambuf>(fh_buf, fh);
679 wr_.cont = ! fin;
680 boost::asio::write(stream_,
681 buffer_cat(fh_buf.data(), buffers), ec);
682 failed_ = ec != 0;
683 if(failed_)
684 return;
685 }
686 else
687 {
688 // no mask, autofrag
689 BOOST_ASSERT(wr_.buf_size != 0);
690 consuming_buffers<
691 ConstBufferSequence> cb{buffers};
692 for(;;)
693 {
694 auto const n = clamp(remain, wr_.buf_size);
695 remain -= n;
696 fh.len = n;
697 fh.fin = fin ? remain == 0 : false;
698 detail::fh_streambuf fh_buf;
699 detail::write<static_streambuf>(fh_buf, fh);
700 wr_.cont = ! fin;
701 boost::asio::write(stream_,
702 buffer_cat(fh_buf.data(),
703 prepare_buffers(n, cb)), ec);
704 failed_ = ec != 0;
705 if(failed_)
706 return;
707 if(remain == 0)
708 break;
709 fh.op = opcode::cont;
710 cb.consume(n);
711 }
712 }
713 return;
714 }
715 if(! wr_.autofrag)
716 {
717 // mask, no autofrag
718 fh.fin = fin;
719 fh.len = remain;
720 fh.key = maskgen_();
721 detail::prepared_key key;
722 detail::prepare_key(key, fh.key);
723 detail::fh_streambuf fh_buf;
724 detail::write<static_streambuf>(fh_buf, fh);
725 consuming_buffers<
726 ConstBufferSequence> cb{buffers};
727 {
728 auto const n = clamp(remain, wr_.buf_size);
729 auto const b = buffer(wr_.buf.get(), n);
730 buffer_copy(b, cb);
731 cb.consume(n);
732 remain -= n;
733 detail::mask_inplace(b, key);
734 wr_.cont = ! fin;
735 boost::asio::write(stream_,
736 buffer_cat(fh_buf.data(), b), ec);
737 failed_ = ec != 0;
738 if(failed_)
739 return;
740 }
741 while(remain > 0)
742 {
743 auto const n = clamp(remain, wr_.buf_size);
744 auto const b = buffer(wr_.buf.get(), n);
745 buffer_copy(b, cb);
746 cb.consume(n);
747 remain -= n;
748 detail::mask_inplace(b, key);
749 boost::asio::write(stream_, b, ec);
750 failed_ = ec != 0;
751 if(failed_)
752 return;
753 }
754 return;
755 }
756 {
757 // mask, autofrag
758 BOOST_ASSERT(wr_.buf_size != 0);
759 consuming_buffers<
760 ConstBufferSequence> cb{buffers};
761 for(;;)
762 {
763 fh.key = maskgen_();
764 detail::prepared_key key;
765 detail::prepare_key(key, fh.key);
766 auto const n = clamp(remain, wr_.buf_size);
767 auto const b = buffer(wr_.buf.get(), n);
768 buffer_copy(b, cb);
769 detail::mask_inplace(b, key);
770 fh.len = n;
771 remain -= n;
772 fh.fin = fin ? remain == 0 : false;
773 wr_.cont = ! fh.fin;
774 detail::fh_streambuf fh_buf;
775 detail::write<static_streambuf>(fh_buf, fh);
776 boost::asio::write(stream_,
777 buffer_cat(fh_buf.data(), b), ec);
778 failed_ = ec != 0;
779 if(failed_)
780 return;
781 if(remain == 0)
782 break;
783 fh.op = opcode::cont;
784 cb.consume(n);
785 }
786 return;
787 }
788 }
789
790 //------------------------------------------------------------------------------
791
792 template<class NextLayer>
793 template<class Buffers, class Handler>
794 class stream<NextLayer>::write_op
795 {
796 struct data : op
797 {
798 bool cont;
799 stream<NextLayer>& ws;
800 consuming_buffers<Buffers> cb;
801 std::size_t remain;
802 int state = 0;
803
804 data(Handler& handler, stream<NextLayer>& ws_,
805 Buffers const& bs)
806 : cont(beast_asio_helpers::
807 is_continuation(handler))
808 , ws(ws_)
809 , cb(bs)
810 , remain(boost::asio::buffer_size(cb))
811 {
812 }
813 };
814
815 handler_ptr<data, Handler> d_;
816
817 public:
818 write_op(write_op&&) = default;
819 write_op(write_op const&) = default;
820
821 template<class DeducedHandler, class... Args>
822 explicit
823 write_op(DeducedHandler&& h,
824 stream<NextLayer>& ws, Args&&... args)
825 : d_(std::forward<DeducedHandler>(h),
826 ws, std::forward<Args>(args)...)
827 {
828 (*this)(error_code{}, false);
829 }
830
831 void operator()(error_code ec, bool again = true);
832
833 friend
834 void* asio_handler_allocate(
835 std::size_t size, write_op* op)
836 {
837 return beast_asio_helpers::
838 allocate(size, op->d_.handler());
839 }
840
841 friend
842 void asio_handler_deallocate(
843 void* p, std::size_t size, write_op* op)
844 {
845 return beast_asio_helpers::
846 deallocate(p, size, op->d_.handler());
847 }
848
849 friend
850 bool asio_handler_is_continuation(write_op* op)
851 {
852 return op->d_->cont;
853 }
854
855 template<class Function>
856 friend
857 void asio_handler_invoke(Function&& f, write_op* op)
858 {
859 return beast_asio_helpers::
860 invoke(f, op->d_.handler());
861 }
862 };
863
864 template<class NextLayer>
865 template<class Buffers, class Handler>
866 void
867 stream<NextLayer>::
868 write_op<Buffers, Handler>::
869 operator()(error_code ec, bool again)
870 {
871 auto& d = *d_;
872 d.cont = d.cont || again;
873 if(! ec)
874 {
875 switch(d.state)
876 {
877 case 0:
878 {
879 auto const n = d.remain;
880 d.remain -= n;
881 auto const fin = d.remain <= 0;
882 if(fin)
883 d.state = 99;
884 auto const pb = prepare_buffers(n, d.cb);
885 d.cb.consume(n);
886 d.ws.async_write_frame(fin, pb, std::move(*this));
887 return;
888 }
889
890 case 99:
891 break;
892 }
893 }
894 d_.invoke(ec);
895 }
896
897 template<class NextLayer>
898 template<class ConstBufferSequence, class WriteHandler>
899 typename async_completion<
900 WriteHandler, void(error_code)>::result_type
901 stream<NextLayer>::
902 async_write(ConstBufferSequence const& bs, WriteHandler&& handler)
903 {
904 static_assert(is_AsyncStream<next_layer_type>::value,
905 "AsyncStream requirements not met");
906 static_assert(beast::is_ConstBufferSequence<
907 ConstBufferSequence>::value,
908 "ConstBufferSequence requirements not met");
909 beast::async_completion<
910 WriteHandler, void(error_code)> completion{handler};
911 write_op<ConstBufferSequence, decltype(completion.handler)>{
912 completion.handler, *this, bs};
913 return completion.result.get();
914 }
915
916 template<class NextLayer>
917 template<class ConstBufferSequence>
918 void
919 stream<NextLayer>::
920 write(ConstBufferSequence const& buffers)
921 {
922 static_assert(is_SyncStream<next_layer_type>::value,
923 "SyncStream requirements not met");
924 static_assert(beast::is_ConstBufferSequence<
925 ConstBufferSequence>::value,
926 "ConstBufferSequence requirements not met");
927 error_code ec;
928 write(buffers, ec);
929 if(ec)
930 throw system_error{ec};
931 }
932
933 template<class NextLayer>
934 template<class ConstBufferSequence>
935 void
936 stream<NextLayer>::
937 write(ConstBufferSequence const& buffers, error_code& ec)
938 {
939 static_assert(is_SyncStream<next_layer_type>::value,
940 "SyncStream requirements not met");
941 static_assert(beast::is_ConstBufferSequence<
942 ConstBufferSequence>::value,
943 "ConstBufferSequence requirements not met");
944 write_frame(true, buffers, ec);
945 }
946
947 } // websocket
948 } // beast
949
950 #endif