1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
12
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/websocket/detail/mask.hpp>
15 #include <boost/beast/websocket/impl/stream_impl.hpp>
16 #include <boost/beast/core/async_base.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/stream_traits.hpp>
19 #include <boost/beast/core/detail/bind_continuation.hpp>
20 #include <boost/asio/coroutine.hpp>
21 #include <boost/asio/post.hpp>
22 #include <boost/throw_exception.hpp>
23 #include <memory>
24
25 namespace boost {
26 namespace beast {
27 namespace websocket {
28
29 /* Close the WebSocket Connection
30
31 This composed operation sends the close frame if it hasn't already
32 been sent, then reads and discards frames until receiving a close
33 frame. Finally it invokes the teardown operation to shut down the
34 underlying connection.
35 */
36 template<class NextLayer, bool deflateSupported>
37 template<class Handler>
38 class stream<NextLayer, deflateSupported>::close_op
39 : public beast::stable_async_base<
40 Handler, beast::executor_type<stream>>
41 , public asio::coroutine
42 {
43 boost::weak_ptr<impl_type> wp_;
44 error_code ev_;
45 detail::frame_buffer& fb_;
46
47 public:
48 static constexpr int id = 5; // for soft_mutex
49
50 template<class Handler_>
close_op(Handler_ && h,boost::shared_ptr<impl_type> const & sp,close_reason const & cr)51 close_op(
52 Handler_&& h,
53 boost::shared_ptr<impl_type> const& sp,
54 close_reason const& cr)
55 : stable_async_base<Handler,
56 beast::executor_type<stream>>(
57 std::forward<Handler_>(h),
58 sp->stream().get_executor())
59 , wp_(sp)
60 , fb_(beast::allocate_stable<
61 detail::frame_buffer>(*this))
62 {
63 // Serialize the close frame
64 sp->template write_close<
65 flat_static_buffer_base>(fb_, cr);
66 (*this)({}, 0, false);
67 }
68
69 void
operator ()(error_code ec={},std::size_t bytes_transferred=0,bool cont=true)70 operator()(
71 error_code ec = {},
72 std::size_t bytes_transferred = 0,
73 bool cont = true)
74 {
75 using beast::detail::clamp;
76 auto sp = wp_.lock();
77 if(! sp)
78 {
79 ec = net::error::operation_aborted;
80 return this->complete(cont, ec);
81 }
82 auto& impl = *sp;
83 BOOST_ASIO_CORO_REENTER(*this)
84 {
85 // Acquire the write lock
86 if(! impl.wr_block.try_lock(this))
87 {
88 BOOST_ASIO_CORO_YIELD
89 {
90 BOOST_ASIO_HANDLER_LOCATION((
91 __FILE__, __LINE__,
92 "websocket::async_close"));
93
94 impl.op_close.emplace(std::move(*this));
95 }
96 impl.wr_block.lock(this);
97 BOOST_ASIO_CORO_YIELD
98 {
99 BOOST_ASIO_HANDLER_LOCATION((
100 __FILE__, __LINE__,
101 "websocket::async_close"));
102
103 net::post(std::move(*this));
104 }
105 BOOST_ASSERT(impl.wr_block.is_locked(this));
106 }
107 if(impl.check_stop_now(ec))
108 goto upcall;
109
110 // Can't call close twice
111 // TODO return a custom error code
112 BOOST_ASSERT(! impl.wr_close);
113
114 // Send close frame
115 impl.wr_close = true;
116 impl.change_status(status::closing);
117 impl.update_timer(this->get_executor());
118 BOOST_ASIO_CORO_YIELD
119 {
120 BOOST_ASIO_HANDLER_LOCATION((
121 __FILE__, __LINE__,
122 "websocket::async_close"));
123
124 net::async_write(impl.stream(), fb_.data(),
125 beast::detail::bind_continuation(std::move(*this)));
126 }
127 if(impl.check_stop_now(ec))
128 goto upcall;
129
130 if(impl.rd_close)
131 {
132 // This happens when the read_op gets a close frame
133 // at the same time close_op is sending the close frame.
134 // The read_op will be suspended on the write block.
135 goto teardown;
136 }
137
138 // Acquire the read lock
139 if(! impl.rd_block.try_lock(this))
140 {
141 BOOST_ASIO_CORO_YIELD
142 {
143 BOOST_ASIO_HANDLER_LOCATION((
144 __FILE__, __LINE__,
145 "websocket::async_close"));
146
147 impl.op_r_close.emplace(std::move(*this));
148 }
149 impl.rd_block.lock(this);
150 BOOST_ASIO_CORO_YIELD
151 {
152 BOOST_ASIO_HANDLER_LOCATION((
153 __FILE__, __LINE__,
154 "websocket::async_close"));
155
156 net::post(std::move(*this));
157 }
158 BOOST_ASSERT(impl.rd_block.is_locked(this));
159 if(impl.check_stop_now(ec))
160 goto upcall;
161 BOOST_ASSERT(! impl.rd_close);
162 }
163
164 // Read until a receiving a close frame
165 // TODO There should be a timeout on this
166 if(impl.rd_remain > 0)
167 goto read_payload;
168 for(;;)
169 {
170 // Read frame header
171 while(! impl.parse_fh(
172 impl.rd_fh, impl.rd_buf, ev_))
173 {
174 if(ev_)
175 goto teardown;
176 BOOST_ASIO_CORO_YIELD
177 {
178 BOOST_ASIO_HANDLER_LOCATION((
179 __FILE__, __LINE__,
180 "websocket::async_close"));
181
182 impl.stream().async_read_some(
183 impl.rd_buf.prepare(read_size(
184 impl.rd_buf, impl.rd_buf.max_size())),
185 beast::detail::bind_continuation(std::move(*this)));
186 }
187 impl.rd_buf.commit(bytes_transferred);
188 if(impl.check_stop_now(ec))
189 goto upcall;
190 }
191 if(detail::is_control(impl.rd_fh.op))
192 {
193 // Discard ping or pong frame
194 if(impl.rd_fh.op != detail::opcode::close)
195 {
196 impl.rd_buf.consume(clamp(impl.rd_fh.len));
197 continue;
198 }
199
200 // Process close frame
201 // TODO Should we invoke the control callback?
202 BOOST_ASSERT(! impl.rd_close);
203 impl.rd_close = true;
204 auto const mb = buffers_prefix(
205 clamp(impl.rd_fh.len),
206 impl.rd_buf.data());
207 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
208 detail::mask_inplace(mb, impl.rd_key);
209 detail::read_close(impl.cr, mb, ev_);
210 if(ev_)
211 goto teardown;
212 impl.rd_buf.consume(clamp(impl.rd_fh.len));
213 goto teardown;
214 }
215
216 read_payload:
217 // Discard message frame
218 while(impl.rd_buf.size() < impl.rd_remain)
219 {
220 impl.rd_remain -= impl.rd_buf.size();
221 impl.rd_buf.consume(impl.rd_buf.size());
222 BOOST_ASIO_CORO_YIELD
223 {
224 BOOST_ASIO_HANDLER_LOCATION((
225 __FILE__, __LINE__,
226 "websocket::async_close"));
227
228 impl.stream().async_read_some(
229 impl.rd_buf.prepare(read_size(
230 impl.rd_buf, impl.rd_buf.max_size())),
231 beast::detail::bind_continuation(std::move(*this)));
232 }
233 impl.rd_buf.commit(bytes_transferred);
234 if(impl.check_stop_now(ec))
235 goto upcall;
236 }
237 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
238 impl.rd_buf.consume(clamp(impl.rd_remain));
239 impl.rd_remain = 0;
240 }
241
242 teardown:
243 // Teardown
244 BOOST_ASSERT(impl.wr_block.is_locked(this));
245 using beast::websocket::async_teardown;
246 BOOST_ASIO_CORO_YIELD
247 {
248 BOOST_ASIO_HANDLER_LOCATION((
249 __FILE__, __LINE__,
250 "websocket::async_close"));
251
252 async_teardown(impl.role, impl.stream(),
253 beast::detail::bind_continuation(std::move(*this)));
254 }
255 BOOST_ASSERT(impl.wr_block.is_locked(this));
256 if(ec == net::error::eof)
257 {
258 // Rationale:
259 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
260 ec = {};
261 }
262 if(! ec)
263 ec = ev_;
264 if(ec)
265 impl.change_status(status::failed);
266 else
267 impl.change_status(status::closed);
268 impl.close();
269
270 upcall:
271 impl.wr_block.unlock(this);
272 impl.rd_block.try_unlock(this)
273 && impl.op_r_rd.maybe_invoke();
274 impl.op_rd.maybe_invoke()
275 || impl.op_idle_ping.maybe_invoke()
276 || impl.op_ping.maybe_invoke()
277 || impl.op_wr.maybe_invoke();
278 this->complete(cont, ec);
279 }
280 }
281 };
282
283 template<class NextLayer, bool deflateSupported>
284 struct stream<NextLayer, deflateSupported>::
285 run_close_op
286 {
287 template<class CloseHandler>
288 void
operator ()boost::beast::websocket::stream::run_close_op289 operator()(
290 CloseHandler&& h,
291 boost::shared_ptr<impl_type> const& sp,
292 close_reason const& cr)
293 {
294 // If you get an error on the following line it means
295 // that your handler does not meet the documented type
296 // requirements for the handler.
297
298 static_assert(
299 beast::detail::is_invocable<CloseHandler,
300 void(error_code)>::value,
301 "CloseHandler type requirements not met");
302
303 close_op<
304 typename std::decay<CloseHandler>::type>(
305 std::forward<CloseHandler>(h),
306 sp,
307 cr);
308 }
309 };
310
311 //------------------------------------------------------------------------------
312
313 template<class NextLayer, bool deflateSupported>
314 void
315 stream<NextLayer, deflateSupported>::
close(close_reason const & cr)316 close(close_reason const& cr)
317 {
318 static_assert(is_sync_stream<next_layer_type>::value,
319 "SyncStream type requirements not met");
320 error_code ec;
321 close(cr, ec);
322 if(ec)
323 BOOST_THROW_EXCEPTION(system_error{ec});
324 }
325
326 template<class NextLayer, bool deflateSupported>
327 void
328 stream<NextLayer, deflateSupported>::
close(close_reason const & cr,error_code & ec)329 close(close_reason const& cr, error_code& ec)
330 {
331 static_assert(is_sync_stream<next_layer_type>::value,
332 "SyncStream type requirements not met");
333 using beast::detail::clamp;
334 auto& impl = *impl_;
335 ec = {};
336 if(impl.check_stop_now(ec))
337 return;
338 BOOST_ASSERT(! impl.rd_close);
339
340 // Can't call close twice
341 // TODO return a custom error code
342 BOOST_ASSERT(! impl.wr_close);
343
344 // Send close frame
345 {
346 impl.wr_close = true;
347 impl.change_status(status::closing);
348 detail::frame_buffer fb;
349 impl.template write_close<flat_static_buffer_base>(fb, cr);
350 net::write(impl.stream(), fb.data(), ec);
351 if(impl.check_stop_now(ec))
352 return;
353 }
354
355 // Read until a receiving a close frame
356 error_code ev;
357 if(impl.rd_remain > 0)
358 goto read_payload;
359 for(;;)
360 {
361 // Read frame header
362 while(! impl.parse_fh(
363 impl.rd_fh, impl.rd_buf, ev))
364 {
365 if(ev)
366 {
367 // Protocol violation
368 return do_fail(close_code::none, ev, ec);
369 }
370 impl.rd_buf.commit(impl.stream().read_some(
371 impl.rd_buf.prepare(read_size(
372 impl.rd_buf, impl.rd_buf.max_size())), ec));
373 if(impl.check_stop_now(ec))
374 return;
375 }
376
377 if(detail::is_control(impl.rd_fh.op))
378 {
379 // Discard ping/pong frame
380 if(impl.rd_fh.op != detail::opcode::close)
381 {
382 impl.rd_buf.consume(clamp(impl.rd_fh.len));
383 continue;
384 }
385
386 // Handle close frame
387 // TODO Should we invoke the control callback?
388 BOOST_ASSERT(! impl.rd_close);
389 impl.rd_close = true;
390 auto const mb = buffers_prefix(
391 clamp(impl.rd_fh.len),
392 impl.rd_buf.data());
393 if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
394 detail::mask_inplace(mb, impl.rd_key);
395 detail::read_close(impl.cr, mb, ev);
396 if(ev)
397 {
398 // Protocol violation
399 return do_fail(close_code::none, ev, ec);
400 }
401 impl.rd_buf.consume(clamp(impl.rd_fh.len));
402 break;
403 }
404
405 read_payload:
406 // Discard message frame
407 while(impl.rd_buf.size() < impl.rd_remain)
408 {
409 impl.rd_remain -= impl.rd_buf.size();
410 impl.rd_buf.consume(impl.rd_buf.size());
411 impl.rd_buf.commit(
412 impl.stream().read_some(
413 impl.rd_buf.prepare(
414 read_size(
415 impl.rd_buf,
416 impl.rd_buf.max_size())),
417 ec));
418 if(impl.check_stop_now(ec))
419 return;
420 }
421 BOOST_ASSERT(
422 impl.rd_buf.size() >= impl.rd_remain);
423 impl.rd_buf.consume(clamp(impl.rd_remain));
424 impl.rd_remain = 0;
425 }
426 // _Close the WebSocket Connection_
427 do_fail(close_code::none, error::closed, ec);
428 if(ec == error::closed)
429 ec = {};
430 }
431
432 template<class NextLayer, bool deflateSupported>
433 template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
BOOST_BEAST_ASYNC_RESULT1(CloseHandler)434 BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
435 stream<NextLayer, deflateSupported>::
436 async_close(close_reason const& cr, CloseHandler&& handler)
437 {
438 static_assert(is_async_stream<next_layer_type>::value,
439 "AsyncStream type requirements not met");
440 return net::async_initiate<
441 CloseHandler,
442 void(error_code)>(
443 run_close_op{},
444 handler,
445 impl_,
446 cr);
447 }
448
449 } // websocket
450 } // beast
451 } // boost
452
453 #endif
454