]> git.proxmox.com Git - ceph.git/blob - ceph/src/boost/boost/beast/websocket/impl/close.ipp
update sources to ceph Nautilus 14.2.1
[ceph.git] / ceph / src / boost / boost / beast / websocket / impl / close.ipp
1 //
2 // Copyright (c) 2016-2017 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_IPP
12
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/core/handler_ptr.hpp>
15 #include <boost/beast/core/flat_static_buffer.hpp>
16 #include <boost/beast/core/type_traits.hpp>
17 #include <boost/beast/core/detail/config.hpp>
18 #include <boost/asio/associated_allocator.hpp>
19 #include <boost/asio/associated_executor.hpp>
20 #include <boost/asio/coroutine.hpp>
21 #include <boost/asio/handler_continuation_hook.hpp>
22 #include <boost/asio/handler_invoke_hook.hpp>
23 #include <boost/asio/post.hpp>
24 #include <boost/throw_exception.hpp>
25 #include <memory>
26
27 namespace boost {
28 namespace beast {
29 namespace websocket {
30
31 /* Close the WebSocket Connection
32
33 This composed operation sends the close frame if it hasn't already
34 been sent, then reads and discards frames until receiving a close
35 frame. Finally it invokes the teardown operation to shut down the
36 underlying connection.
37 */
38 template<class NextLayer, bool deflateSupported>
39 template<class Handler>
40 class stream<NextLayer, deflateSupported>::close_op
41 : public boost::asio::coroutine
42 {
43 struct state
44 {
45 stream<NextLayer, deflateSupported>& ws;
46 detail::frame_buffer fb;
47 error_code ev;
48 bool cont = false;
49
50 state(
51 Handler const&,
52 stream<NextLayer, deflateSupported>& ws_,
53 close_reason const& cr)
54 : ws(ws_)
55 {
56 // Serialize the close frame
57 ws.template write_close<
58 flat_static_buffer_base>(fb, cr);
59 }
60 };
61
62 handler_ptr<state, Handler> d_;
63
64 public:
65 static constexpr int id = 4; // for soft_mutex
66
67 close_op(close_op&&) = default;
68 close_op(close_op const&) = delete;
69
70 template<class DeducedHandler>
71 close_op(
72 DeducedHandler&& h,
73 stream<NextLayer, deflateSupported>& ws,
74 close_reason const& cr)
75 : d_(std::forward<DeducedHandler>(h), ws, cr)
76 {
77 }
78
79 using allocator_type =
80 boost::asio::associated_allocator_t<Handler>;
81
82 allocator_type
83 get_allocator() const noexcept
84 {
85 return (boost::asio::get_associated_allocator)(d_.handler());
86 }
87
88 using executor_type = boost::asio::associated_executor_t<
89 Handler, decltype(std::declval<stream<NextLayer, deflateSupported>&>().get_executor())>;
90
91 executor_type
92 get_executor() const noexcept
93 {
94 return (boost::asio::get_associated_executor)(
95 d_.handler(), d_->ws.get_executor());
96 }
97
98 void
99 operator()(
100 error_code ec = {},
101 std::size_t bytes_transferred = 0,
102 bool cont = true);
103
104 friend
105 bool asio_handler_is_continuation(close_op* op)
106 {
107 using boost::asio::asio_handler_is_continuation;
108 return op->d_->cont || asio_handler_is_continuation(
109 std::addressof(op->d_.handler()));
110 }
111
112 template<class Function>
113 friend
114 void asio_handler_invoke(Function&& f, close_op* op)
115 {
116 using boost::asio::asio_handler_invoke;
117 asio_handler_invoke(f,
118 std::addressof(op->d_.handler()));
119 }
120 };
121
122 template<class NextLayer, bool deflateSupported>
123 template<class Handler>
124 void
125 stream<NextLayer, deflateSupported>::
126 close_op<Handler>::
127 operator()(
128 error_code ec,
129 std::size_t bytes_transferred,
130 bool cont)
131 {
132 using beast::detail::clamp;
133 auto& d = *d_;
134 d.cont = cont;
135 BOOST_ASIO_CORO_REENTER(*this)
136 {
137 // Maybe suspend
138 if(d.ws.wr_block_.try_lock(this))
139 {
140 // Make sure the stream is open
141 if(! d.ws.check_open(ec))
142 goto upcall;
143 }
144 else
145 {
146 // Suspend
147 BOOST_ASIO_CORO_YIELD
148 d.ws.paused_close_.emplace(std::move(*this));
149
150 // Acquire the write block
151 d.ws.wr_block_.lock(this);
152
153 // Resume
154 BOOST_ASIO_CORO_YIELD
155 boost::asio::post(
156 d.ws.get_executor(), std::move(*this));
157 BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
158
159 // Make sure the stream is open
160 if(! d.ws.check_open(ec))
161 goto upcall;
162 }
163
164 // Can't call close twice
165 BOOST_ASSERT(! d.ws.wr_close_);
166
167 // Change status to closing
168 BOOST_ASSERT(d.ws.status_ == status::open);
169 d.ws.status_ = status::closing;
170
171 // Send close frame
172 d.ws.wr_close_ = true;
173 BOOST_ASIO_CORO_YIELD
174 boost::asio::async_write(d.ws.stream_,
175 d.fb.data(), std::move(*this));
176 if(! d.ws.check_ok(ec))
177 goto upcall;
178
179 if(d.ws.rd_close_)
180 {
181 // This happens when the read_op gets a close frame
182 // at the same time close_op is sending the close frame.
183 // The read_op will be suspended on the write block.
184 goto teardown;
185 }
186
187 // Maybe suspend
188 if(! d.ws.rd_block_.try_lock(this))
189 {
190 // Suspend
191 BOOST_ASIO_CORO_YIELD
192 d.ws.paused_r_close_.emplace(std::move(*this));
193
194 // Acquire the read block
195 d.ws.rd_block_.lock(this);
196
197 // Resume
198 BOOST_ASIO_CORO_YIELD
199 boost::asio::post(
200 d.ws.get_executor(), std::move(*this));
201 BOOST_ASSERT(d.ws.rd_block_.is_locked(this));
202
203 // Make sure the stream is open
204 BOOST_ASSERT(d.ws.status_ != status::open);
205 BOOST_ASSERT(d.ws.status_ != status::closed);
206 if( d.ws.status_ == status::failed)
207 goto upcall;
208
209 BOOST_ASSERT(! d.ws.rd_close_);
210 }
211
212 // Drain
213 if(d.ws.rd_remain_ > 0)
214 goto read_payload;
215 for(;;)
216 {
217 // Read frame header
218 while(! d.ws.parse_fh(
219 d.ws.rd_fh_, d.ws.rd_buf_, d.ev))
220 {
221 if(d.ev)
222 goto teardown;
223 BOOST_ASIO_CORO_YIELD
224 d.ws.stream_.async_read_some(
225 d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
226 d.ws.rd_buf_.max_size())),
227 std::move(*this));
228 if(! d.ws.check_ok(ec))
229 goto upcall;
230 d.ws.rd_buf_.commit(bytes_transferred);
231 }
232 if(detail::is_control(d.ws.rd_fh_.op))
233 {
234 // Process control frame
235 if(d.ws.rd_fh_.op == detail::opcode::close)
236 {
237 BOOST_ASSERT(! d.ws.rd_close_);
238 d.ws.rd_close_ = true;
239 auto const mb = buffers_prefix(
240 clamp(d.ws.rd_fh_.len),
241 d.ws.rd_buf_.mutable_data());
242 if(d.ws.rd_fh_.len > 0 && d.ws.rd_fh_.mask)
243 detail::mask_inplace(mb, d.ws.rd_key_);
244 detail::read_close(d.ws.cr_, mb, d.ev);
245 if(d.ev)
246 goto teardown;
247 d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
248 goto teardown;
249 }
250 d.ws.rd_buf_.consume(clamp(d.ws.rd_fh_.len));
251 }
252 else
253 {
254 read_payload:
255 while(d.ws.rd_buf_.size() < d.ws.rd_remain_)
256 {
257 d.ws.rd_remain_ -= d.ws.rd_buf_.size();
258 d.ws.rd_buf_.consume(d.ws.rd_buf_.size());
259 BOOST_ASIO_CORO_YIELD
260 d.ws.stream_.async_read_some(
261 d.ws.rd_buf_.prepare(read_size(d.ws.rd_buf_,
262 d.ws.rd_buf_.max_size())),
263 std::move(*this));
264 if(! d.ws.check_ok(ec))
265 goto upcall;
266 d.ws.rd_buf_.commit(bytes_transferred);
267 }
268 BOOST_ASSERT(d.ws.rd_buf_.size() >= d.ws.rd_remain_);
269 d.ws.rd_buf_.consume(clamp(d.ws.rd_remain_));
270 d.ws.rd_remain_ = 0;
271 }
272 }
273
274 teardown:
275 // Teardown
276 BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
277 using beast::websocket::async_teardown;
278 BOOST_ASIO_CORO_YIELD
279 async_teardown(d.ws.role_,
280 d.ws.stream_, std::move(*this));
281 BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
282 if(ec == boost::asio::error::eof)
283 {
284 // Rationale:
285 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
286 ec.assign(0, ec.category());
287 }
288 if(! ec)
289 ec = d.ev;
290 if(ec)
291 d.ws.status_ = status::failed;
292 else
293 d.ws.status_ = status::closed;
294 d.ws.close();
295
296 upcall:
297 BOOST_ASSERT(d.ws.wr_block_.is_locked(this));
298 d.ws.wr_block_.unlock(this);
299 if(d.ws.rd_block_.try_unlock(this))
300 d.ws.paused_r_rd_.maybe_invoke();
301 d.ws.paused_rd_.maybe_invoke() ||
302 d.ws.paused_ping_.maybe_invoke() ||
303 d.ws.paused_wr_.maybe_invoke();
304 if(! d.cont)
305 {
306 auto& ws = d.ws;
307 return boost::asio::post(
308 ws.stream_.get_executor(),
309 bind_handler(d_.release_handler(), ec));
310 }
311 d_.invoke(ec);
312 }
313 }
314
315 //------------------------------------------------------------------------------
316
317 template<class NextLayer, bool deflateSupported>
318 void
319 stream<NextLayer, deflateSupported>::
320 close(close_reason const& cr)
321 {
322 static_assert(is_sync_stream<next_layer_type>::value,
323 "SyncStream requirements not met");
324 error_code ec;
325 close(cr, ec);
326 if(ec)
327 BOOST_THROW_EXCEPTION(system_error{ec});
328 }
329
330 template<class NextLayer, bool deflateSupported>
331 void
332 stream<NextLayer, deflateSupported>::
333 close(close_reason const& cr, error_code& ec)
334 {
335 static_assert(is_sync_stream<next_layer_type>::value,
336 "SyncStream requirements not met");
337 using beast::detail::clamp;
338 ec.assign(0, ec.category());
339 // Make sure the stream is open
340 if(! check_open(ec))
341 return;
342 // If rd_close_ is set then we already sent a close
343 BOOST_ASSERT(! rd_close_);
344 BOOST_ASSERT(! wr_close_);
345 wr_close_ = true;
346 {
347 detail::frame_buffer fb;
348 write_close<flat_static_buffer_base>(fb, cr);
349 boost::asio::write(stream_, fb.data(), ec);
350 }
351 if(! check_ok(ec))
352 return;
353 status_ = status::closing;
354 error_code result;
355 // Drain the connection
356 if(rd_remain_ > 0)
357 goto read_payload;
358 for(;;)
359 {
360 // Read frame header
361 while(! parse_fh(rd_fh_, rd_buf_, result))
362 {
363 if(result)
364 return do_fail(
365 close_code::none, result, ec);
366 auto const bytes_transferred =
367 stream_.read_some(
368 rd_buf_.prepare(read_size(rd_buf_,
369 rd_buf_.max_size())), ec);
370 if(! check_ok(ec))
371 return;
372 rd_buf_.commit(bytes_transferred);
373 }
374 if(detail::is_control(rd_fh_.op))
375 {
376 // Process control frame
377 if(rd_fh_.op == detail::opcode::close)
378 {
379 BOOST_ASSERT(! rd_close_);
380 rd_close_ = true;
381 auto const mb = buffers_prefix(
382 clamp(rd_fh_.len),
383 rd_buf_.mutable_data());
384 if(rd_fh_.len > 0 && rd_fh_.mask)
385 detail::mask_inplace(mb, rd_key_);
386 detail::read_close(cr_, mb, result);
387 if(result)
388 {
389 // Protocol violation
390 return do_fail(
391 close_code::none, result, ec);
392 }
393 rd_buf_.consume(clamp(rd_fh_.len));
394 break;
395 }
396 rd_buf_.consume(clamp(rd_fh_.len));
397 }
398 else
399 {
400 read_payload:
401 while(rd_buf_.size() < rd_remain_)
402 {
403 rd_remain_ -= rd_buf_.size();
404 rd_buf_.consume(rd_buf_.size());
405 auto const bytes_transferred =
406 stream_.read_some(
407 rd_buf_.prepare(read_size(rd_buf_,
408 rd_buf_.max_size())), ec);
409 if(! check_ok(ec))
410 return;
411 rd_buf_.commit(bytes_transferred);
412 }
413 BOOST_ASSERT(rd_buf_.size() >= rd_remain_);
414 rd_buf_.consume(clamp(rd_remain_));
415 rd_remain_ = 0;
416 }
417 }
418 // _Close the WebSocket Connection_
419 do_fail(close_code::none, error::closed, ec);
420 if(ec == error::closed)
421 ec.assign(0, ec.category());
422 }
423
424 template<class NextLayer, bool deflateSupported>
425 template<class CloseHandler>
426 BOOST_ASIO_INITFN_RESULT_TYPE(
427 CloseHandler, void(error_code))
428 stream<NextLayer, deflateSupported>::
429 async_close(close_reason const& cr, CloseHandler&& handler)
430 {
431 static_assert(is_async_stream<next_layer_type>::value,
432 "AsyncStream requirements not met");
433 BOOST_BEAST_HANDLER_INIT(
434 CloseHandler, void(error_code));
435 close_op<BOOST_ASIO_HANDLER_TYPE(
436 CloseHandler, void(error_code))>{
437 std::move(init.completion_handler), *this, cr}(
438 {}, 0, false);
439 return init.result.get();
440 }
441
442 } // websocket
443 } // beast
444 } // boost
445
446 #endif