]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/websocket/impl/close.ipp
update sources to v12.2.3
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / close.ipp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
12
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/handler_ptr.hpp>
15 #include <boost/beast/core/flat_static_buffer.hpp>
16 #include <boost/beast/core/type_traits.hpp>
17 #include <boost/beast/core/detail/config.hpp>
18 #include <boost/asio/associated_allocator.hpp>
19 #include <boost/asio/associated_executor.hpp>
20 #include <boost/asio/coroutine.hpp>
21 #include <boost/asio/handler_continuation_hook.hpp>
22 #include <boost/asio/post.hpp>
23 #include <boost/throw_exception.hpp>
24 #include <memory>
25
26 namespace boost {
27 namespace beast {
28 namespace websocket {
29
30 /* Close the WebSocket Connection
31
32 This composed operation sends the close frame if it hasn't already
33 been sent, then reads and discards frames until receiving a close
34 frame. Finally it invokes the teardown operation to shut down the
35 underlying connection.
36 */
37 template<class NextLayer>
38 template<class Handler>
39 class stream<NextLayer>::close_op
40 : public boost::asio::coroutine
41 {
42 struct state
43 {
44 stream<NextLayer>& ws;
45 detail::frame_buffer fb;
46 error_code ev;
47 token tok;
48 bool cont = false;
49
50 state(
51 Handler&,
52 stream<NextLayer>& ws_,
53 close_reason const& cr)
54 : ws(ws_)
55 , tok(ws.tok_.unique())
56 {
57 // Serialize the close frame
58 ws.template write_close<
59 flat_static_buffer_base>(fb, cr);
60 }
61 };
62
63 handler_ptr<state, Handler> d_;
64
65 public:
66 close_op(close_op&&) = default;
67 close_op(close_op const&) = default;
68
69 template<class DeducedHandler>
70 close_op(
71 DeducedHandler&& h,
72 stream<NextLayer>& ws,
73 close_reason const& cr)
74 : d_(std::forward<DeducedHandler>(h), ws, cr)
75 {
76 }
77
78 using allocator_type =
79 boost::asio::associated_allocator_t<Handler>;
80
81 allocator_type
82 get_allocator() const noexcept
83 {
84 return boost::asio::get_associated_allocator(d_.handler());
85 }
86
87 using executor_type = boost::asio::associated_executor_t<
88 Handler, decltype(std::declval<stream<NextLayer>&>().get_executor())>;
89
90 executor_type
91 get_executor() const noexcept
92 {
93 return boost::asio::get_associated_executor(
94 d_.handler(), d_->ws.get_executor());
95 }
96
97 void
98 operator()(
99 error_code ec = {},
100 std::size_t bytes_transferred = 0,
101 bool cont = true);
102
103 friend
104 bool asio_handler_is_continuation(close_op* op)
105 {
106 using boost::asio::asio_handler_is_continuation;
107 return op->d_->cont || asio_handler_is_continuation(
108 std::addressof(op->d_.handler()));
109 }
110 };
111
112 template<class NextLayer>
113 template<class Handler>
114 void
115 stream<NextLayer>::
116 close_op<Handler>::
117 operator()(
118 error_code ec,
119 std::size_t bytes_transferred,
120 bool cont)
121 {
122 using beast::detail::clamp;
123 auto& d = *d_;
124 close_code code{};
125 d.cont = cont;
126 BOOST_ASIO_CORO_REENTER(*this)
127 {
128 // Maybe suspend
129 if(! d.ws.wr_block_)
130 {
131 // Acquire the write block
132 d.ws.wr_block_ = d.tok;
133
134 // Make sure the stream is open
135 if(! d.ws.check_open(ec))
136 goto upcall;
137 }
138 else
139 {
140 // Suspend
141 BOOST_ASSERT(d.ws.wr_block_ != d.tok);
142 BOOST_ASIO_CORO_YIELD
143 d.ws.paused_close_.emplace(std::move(*this));
144
145 // Acquire the write block
146 BOOST_ASSERT(! d.ws.wr_block_);
147 d.ws.wr_block_ = d.tok;
148
149 // Resume
150 BOOST_ASIO_CORO_YIELD
151 boost::asio::post(
152 d.ws.get_executor(), std::move(*this));
153 BOOST_ASSERT(d.ws.wr_block_ == d.tok);
154
155 // Make sure the stream is open
156 if(! d.ws.check_open(ec))
157 goto upcall;
158 }
159
160 // Can't call close twice
161 BOOST_ASSERT(! d.ws.wr_close_);
162
163 // Change status to closing
164 BOOST_ASSERT(d.ws.status_ == status::open);
165 d.ws.status_ = status::closing;
166
167 // Send close frame
168 d.ws.wr_close_ = true;
169 BOOST_ASIO_CORO_YIELD
170 boost::asio::async_write(d.ws.stream_,
171 d.fb.data(), std::move(*this));
172 if(! d.ws.check_ok(ec))
173 goto upcall;
174
175 if(d.ws.rd_close_)
176 {
177 // This happens when the read_op gets a close frame
178 // at the same time close_op is sending the close frame.
179 // The read_op will be suspended on the write block.
180 goto teardown;
181 }
182
183 // Maybe suspend
184 if(! d.ws.rd_block_)
185 {
186 // Acquire the read block
187 d.ws.rd_block_ = d.tok;
188 }
189 else
190 {
191 // Suspend
192 BOOST_ASSERT(d.ws.rd_block_ != d.tok);
193 BOOST_ASIO_CORO_YIELD
194 d.ws.paused_r_close_.emplace(std::move(*this));
195
196 // Acquire the read block
197 BOOST_ASSERT(! d.ws.rd_block_);
198 d.ws.rd_block_ = d.tok;
199
200 // Resume
201 BOOST_ASIO_CORO_YIELD
202 boost::asio::post(
203 d.ws.get_executor(), std::move(*this));
204 BOOST_ASSERT(d.ws.rd_block_ == d.tok);
205
206 // Make sure the stream is open
207 BOOST_ASSERT(d.ws.status_ != status::open);
208 BOOST_ASSERT(d.ws.status_ != status::closed);
209 if( d.ws.status_ == status::failed)
210 goto upcall;
211
212 BOOST_ASSERT(! d.ws.rd_close_);
213 }
214
215 // Drain
216 if(d.ws.rd_remain_ > 0)
217 goto read_payload;
218 for(;;)
219 {
220 // Read frame header
221 while(! d.ws.parse_fh(
222 d.ws.rd_fh_, d.ws.rd_buf_, code))
223 {
224 if(code != close_code::none)
225 {
226 d.ev = error::failed;
227 goto teardown;
228 }
229 BOOST_ASIO_CORO_YIELD
230 d.ws.stream_.async_read_some(
231 d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
232 d.ws.rd_buf_.max_size())),
233 std::move(*this));
234 if(! d.ws.check_ok(ec))
235 goto upcall;
236 d.ws.rd_buf_.commit(bytes_transferred);
237 }
238 if(detail::is_control(d.ws.rd_fh_.op))
239 {
240 // Process control frame
241 if(d.ws.rd_fh_.op == detail::opcode::close)
242 {
243 BOOST_ASSERT(! d.ws.rd_close_);
244 d.ws.rd_close_ = true;
245 auto const mb = buffers_prefix(
246 clamp(d.ws.rd_fh_.len),
247 d.ws.rd_buf_.data());
248 if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask)
249 detail::mask_inplace(mb, d.ws.rd_key_);
250 detail::read_close(d.ws.cr_, mb, code);
251 if(code != close_code::none)
252 {
253 // Protocol error
254 d.ev = error::failed;
255 goto teardown;
256 }
257 d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
258 goto teardown;
259 }
260 d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
261 }
262 else
263 {
264 read_payload:
265 while(d.ws.rd_buf_.size() < d.ws.rd_remain_)
266 {
267 d.ws.rd_remain_ -= d.ws.rd_buf_.size();
268 d.ws.rd_buf_.consume(d.ws.rd_buf_.size());
269 BOOST_ASIO_CORO_YIELD
270 d.ws.stream_.async_read_some(
271 d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
272 d.ws.rd_buf_.max_size())),
273 std::move(*this));
274 if(! d.ws.check_ok(ec))
275 goto upcall;
276 d.ws.rd_buf_.commit(bytes_transferred);
277 }
278 BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
279 d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_));
280 d.ws.rd_remain_ = 0;
281 }
282 }
283
284 teardown:
285 // Teardown
286 BOOST_ASSERT(d.ws.wr_block_ == d.tok);
287 using beast::websocket::async_teardown;
288 BOOST_ASIO_CORO_YIELD
289 async_teardown(d.ws.role_,
290 d.ws.stream_, std::move(*this));
291 BOOST_ASSERT(d.ws.wr_block_ == d.tok);
292 if(ec == boost::asio::error::eof)
293 {
294 // Rationale:
295 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
296 ec.assign(0, ec.category());
297 }
298 if(! ec)
299 ec = d.ev;
300 if(ec)
301 d.ws.status_ = status::failed;
302 else
303 d.ws.status_ = status::closed;
304 d.ws.close();
305
306 upcall:
307 BOOST_ASSERT(d.ws.wr_block_ == d.tok);
308 d.ws.wr_block_.reset();
309 if(d.ws.rd_block_ == d.tok)
310 {
311 d.ws.rd_block_.reset();
312 d.ws.paused_r_rd_.maybe_invoke();
313 }
314 d.ws.paused_rd_.maybe_invoke() ||
315 d.ws.paused_ping_.maybe_invoke() ||
316 d.ws.paused_wr_.maybe_invoke();
317 if(! d.cont)
318 {
319 auto& ws = d.ws;
320 return boost::asio::post(
321 ws.stream_.get_executor(),
322 bind_handler(d_.release_handler(), ec));
323 }
324 d_.invoke(ec);
325 }
326 }
327
328 //------------------------------------------------------------------------------
329
330 template<class NextLayer>
331 void
332 stream<NextLayer>::
333 close(close_reason const& cr)
334 {
335 static_assert(is_sync_stream<next_layer_type>::value,
336 "SyncStream requirements not met");
337 error_code ec;
338 close(cr, ec);
339 if(ec)
340 BOOST_THROW_EXCEPTION(system_error{ec});
341 }
342
343 template<class NextLayer>
344 void
345 stream<NextLayer>::
346 close(close_reason const& cr, error_code& ec)
347 {
348 static_assert(is_sync_stream<next_layer_type>::value,
349 "SyncStream requirements not met");
350 using beast::detail::clamp;
351 ec.assign(0, ec.category());
352 // Make sure the stream is open
353 if(! check_open(ec))
354 return;
355 // If rd_close_ is set then we already sent a close
356 BOOST_ASSERT(! rd_close_);
357 BOOST_ASSERT(! wr_close_);
358 wr_close_ = true;
359 {
360 detail::frame_buffer fb;
361 write_close<flat_static_buffer_base>(fb, cr);
362 boost::asio::write(stream_, fb.data(), ec);
363 }
364 if(! check_ok(ec))
365 return;
366 status_ = status::closing;
367 // Drain the connection
368 close_code code{};
369 if(rd_remain_ > 0)
370 goto read_payload;
371 for(;;)
372 {
373 // Read frame header
374 while(! parse_fh(rd_fh_, rd_buf_, code))
375 {
376 if(code != close_code::none)
377 return do_fail(close_code::none,
378 error::failed, ec);
379 auto const bytes_transferred =
380 stream_.read_some(
381 rd_buf_.prepare(read_size(rd_buf_,
382 rd_buf_.max_size())), ec);
383 if(! check_ok(ec))
384 return;
385 rd_buf_.commit(bytes_transferred);
386 }
387 if(detail::is_control(rd_fh_.op))
388 {
389 // Process control frame
390 if(rd_fh_.op == detail::opcode::close)
391 {
392 BOOST_ASSERT(! rd_close_);
393 rd_close_ = true;
394 auto const mb = buffers_prefix(
395 clamp(rd_fh_.len),
396 rd_buf_.data());
397 if(rd_fh_.len > 0 && rd_fh_.mask)
398 detail::mask_inplace(mb, rd_key_);
399 detail::read_close(cr_, mb, code);
400 if(code != close_code::none)
401 {
402 // Protocol error
403 return do_fail(close_code::none,
404 error::failed, ec);
405 }
406 rd_buf_.consume(clamp(rd_fh_.len));
407 break;
408 }
409 rd_buf_.consume(clamp(rd_fh_.len));
410 }
411 else
412 {
413 read_payload:
414 while(rd_buf_.size() < rd_remain_)
415 {
416 rd_remain_ -= rd_buf_.size();
417 rd_buf_.consume(rd_buf_.size());
418 auto const bytes_transferred =
419 stream_.read_some(
420 rd_buf_.prepare(read_size(rd_buf_,
421 rd_buf_.max_size())), ec);
422 if(! check_ok(ec))
423 return;
424 rd_buf_.commit(bytes_transferred);
425 }
426 BOOST_ASSERT(rd_buf_.size() >= rd_remain_);
427 rd_buf_.consume(clamp(rd_remain_));
428 rd_remain_ = 0;
429 }
430 }
431 // _Close the WebSocket Connection_
432 do_fail(close_code::none, error::closed, ec);
433 if(ec == error::closed)
434 ec.assign(0, ec.category());
435 }
436
437 template<class NextLayer>
438 template<class CloseHandler>
439 BOOST_ASIO_INITFN_RESULT_TYPE(
440 CloseHandler, void(error_code))
441 stream<NextLayer>::
442 async_close(close_reason const& cr, CloseHandler&& handler)
443 {
444 static_assert(is_async_stream<next_layer_type>::value,
445 "AsyncStream requirements not met");
446 boost::asio::async_completion<CloseHandler,
447 void(error_code)> init{handler};
448 close_op<BOOST_ASIO_HANDLER_TYPE(
449 CloseHandler, void(error_code))>{
450 init.completion_handler, *this, cr}(
451 {}, 0, false);
452 return init.result.get();
453 }
454
455 } // websocket
456 } // beast
457 } // boost
458
459 #endif