1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 #ifndef BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
11 #define BOOST_BEAST_WEBSOCKET_IMPL_CLOSE_HPP
12 
13 #include <boost/beast/websocket/teardown.hpp>
14 #include <boost/beast/websocket/detail/mask.hpp>
15 #include <boost/beast/websocket/impl/stream_impl.hpp>
16 #include <boost/beast/core/async_base.hpp>
17 #include <boost/beast/core/flat_static_buffer.hpp>
18 #include <boost/beast/core/stream_traits.hpp>
19 #include <boost/beast/core/detail/bind_continuation.hpp>
20 #include <boost/asio/coroutine.hpp>
21 #include <boost/asio/post.hpp>
22 #include <boost/throw_exception.hpp>
23 #include <memory>
24 
25 namespace boost {
26 namespace beast {
27 namespace websocket {
28 
29 /*  Close the WebSocket Connection
30 
31     This composed operation sends the close frame if it hasn't already
32     been sent, then reads and discards frames until receiving a close
33     frame. Finally it invokes the teardown operation to shut down the
34     underlying connection.
35 */
36 template<class NextLayer, bool deflateSupported>
37 template<class Handler>
38 class stream<NextLayer, deflateSupported>::close_op
39     : public beast::stable_async_base<
40         Handler, beast::executor_type<stream>>
41     , public asio::coroutine
42 {
43     boost::weak_ptr<impl_type> wp_;
44     error_code ev_;
45     detail::frame_buffer& fb_;
46 
47 public:
48     static constexpr int id = 5; // for soft_mutex
49 
50     template<class Handler_>
close_op(Handler_ && h,boost::shared_ptr<impl_type> const & sp,close_reason const & cr)51     close_op(
52         Handler_&& h,
53         boost::shared_ptr<impl_type> const& sp,
54         close_reason const& cr)
55         : stable_async_base<Handler,
56             beast::executor_type<stream>>(
57                 std::forward<Handler_>(h),
58                     sp->stream().get_executor())
59         , wp_(sp)
60         , fb_(beast::allocate_stable<
61             detail::frame_buffer>(*this))
62     {
63         // Serialize the close frame
64         sp->template write_close<
65             flat_static_buffer_base>(fb_, cr);
66         (*this)({}, 0, false);
67     }
68 
69     void
operator ()(error_code ec={},std::size_t bytes_transferred=0,bool cont=true)70     operator()(
71         error_code ec = {},
72         std::size_t bytes_transferred = 0,
73         bool cont = true)
74     {
75         using beast::detail::clamp;
76         auto sp = wp_.lock();
77         if(! sp)
78         {
79             ec = net::error::operation_aborted;
80             return this->complete(cont, ec);
81         }
82         auto& impl = *sp;
83         BOOST_ASIO_CORO_REENTER(*this)
84         {
85             // Acquire the write lock
86             if(! impl.wr_block.try_lock(this))
87             {
88                 BOOST_ASIO_CORO_YIELD
89                 {
90                     BOOST_ASIO_HANDLER_LOCATION((
91                         __FILE__, __LINE__,
92                         "websocket::async_close"));
93 
94                     impl.op_close.emplace(std::move(*this));
95                 }
96                 impl.wr_block.lock(this);
97                 BOOST_ASIO_CORO_YIELD
98                 {
99                     BOOST_ASIO_HANDLER_LOCATION((
100                         __FILE__, __LINE__,
101                         "websocket::async_close"));
102 
103                     net::post(std::move(*this));
104                 }
105                 BOOST_ASSERT(impl.wr_block.is_locked(this));
106             }
107             if(impl.check_stop_now(ec))
108                 goto upcall;
109 
110             // Can't call close twice
111             // TODO return a custom error code
112             BOOST_ASSERT(! impl.wr_close);
113 
114             // Send close frame
115             impl.wr_close = true;
116             impl.change_status(status::closing);
117             impl.update_timer(this->get_executor());
118             BOOST_ASIO_CORO_YIELD
119             {
120                 BOOST_ASIO_HANDLER_LOCATION((
121                     __FILE__, __LINE__,
122                     "websocket::async_close"));
123 
124                 net::async_write(impl.stream(), fb_.data(),
125                     beast::detail::bind_continuation(std::move(*this)));
126             }
127             if(impl.check_stop_now(ec))
128                 goto upcall;
129 
130             if(impl.rd_close)
131             {
132                 // This happens when the read_op gets a close frame
133                 // at the same time close_op is sending the close frame.
134                 // The read_op will be suspended on the write block.
135                 goto teardown;
136             }
137 
138             // Acquire the read lock
139             if(! impl.rd_block.try_lock(this))
140             {
141                 BOOST_ASIO_CORO_YIELD
142                 {
143                     BOOST_ASIO_HANDLER_LOCATION((
144                         __FILE__, __LINE__,
145                         "websocket::async_close"));
146 
147                     impl.op_r_close.emplace(std::move(*this));
148                 }
149                 impl.rd_block.lock(this);
150                 BOOST_ASIO_CORO_YIELD
151                 {
152                     BOOST_ASIO_HANDLER_LOCATION((
153                         __FILE__, __LINE__,
154                         "websocket::async_close"));
155 
156                     net::post(std::move(*this));
157                 }
158                 BOOST_ASSERT(impl.rd_block.is_locked(this));
159                 if(impl.check_stop_now(ec))
160                     goto upcall;
161                 BOOST_ASSERT(! impl.rd_close);
162             }
163 
164             // Read until a receiving a close frame
165             // TODO There should be a timeout on this
166             if(impl.rd_remain > 0)
167                 goto read_payload;
168             for(;;)
169             {
170                 // Read frame header
171                 while(! impl.parse_fh(
172                     impl.rd_fh, impl.rd_buf, ev_))
173                 {
174                     if(ev_)
175                         goto teardown;
176                     BOOST_ASIO_CORO_YIELD
177                     {
178                         BOOST_ASIO_HANDLER_LOCATION((
179                             __FILE__, __LINE__,
180                             "websocket::async_close"));
181 
182                         impl.stream().async_read_some(
183                             impl.rd_buf.prepare(read_size(
184                                 impl.rd_buf, impl.rd_buf.max_size())),
185                             beast::detail::bind_continuation(std::move(*this)));
186                     }
187                     impl.rd_buf.commit(bytes_transferred);
188                     if(impl.check_stop_now(ec))
189                         goto upcall;
190                 }
191                 if(detail::is_control(impl.rd_fh.op))
192                 {
193                     // Discard ping or pong frame
194                     if(impl.rd_fh.op != detail::opcode::close)
195                     {
196                         impl.rd_buf.consume(clamp(impl.rd_fh.len));
197                         continue;
198                     }
199 
200                     // Process close frame
201                     // TODO Should we invoke the control callback?
202                     BOOST_ASSERT(! impl.rd_close);
203                     impl.rd_close = true;
204                     auto const mb = buffers_prefix(
205                         clamp(impl.rd_fh.len),
206                         impl.rd_buf.data());
207                     if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
208                         detail::mask_inplace(mb, impl.rd_key);
209                     detail::read_close(impl.cr, mb, ev_);
210                     if(ev_)
211                         goto teardown;
212                     impl.rd_buf.consume(clamp(impl.rd_fh.len));
213                     goto teardown;
214                 }
215 
216             read_payload:
217                 // Discard message frame
218                 while(impl.rd_buf.size() < impl.rd_remain)
219                 {
220                     impl.rd_remain -= impl.rd_buf.size();
221                     impl.rd_buf.consume(impl.rd_buf.size());
222                     BOOST_ASIO_CORO_YIELD
223                     {
224                         BOOST_ASIO_HANDLER_LOCATION((
225                             __FILE__, __LINE__,
226                             "websocket::async_close"));
227 
228                         impl.stream().async_read_some(
229                             impl.rd_buf.prepare(read_size(
230                                 impl.rd_buf, impl.rd_buf.max_size())),
231                             beast::detail::bind_continuation(std::move(*this)));
232                     }
233                     impl.rd_buf.commit(bytes_transferred);
234                     if(impl.check_stop_now(ec))
235                         goto upcall;
236                 }
237                 BOOST_ASSERT(impl.rd_buf.size() >= impl.rd_remain);
238                 impl.rd_buf.consume(clamp(impl.rd_remain));
239                 impl.rd_remain = 0;
240             }
241 
242         teardown:
243             // Teardown
244             BOOST_ASSERT(impl.wr_block.is_locked(this));
245             using beast::websocket::async_teardown;
246             BOOST_ASIO_CORO_YIELD
247             {
248                 BOOST_ASIO_HANDLER_LOCATION((
249                     __FILE__, __LINE__,
250                     "websocket::async_close"));
251 
252                 async_teardown(impl.role, impl.stream(),
253                     beast::detail::bind_continuation(std::move(*this)));
254             }
255             BOOST_ASSERT(impl.wr_block.is_locked(this));
256             if(ec == net::error::eof)
257             {
258                 // Rationale:
259                 // http://stackoverflow.com/questions/25587403/boost-asio-ssl-async-shutdown-always-finishes-with-an-error
260                 ec = {};
261             }
262             if(! ec)
263                 ec = ev_;
264             if(ec)
265                 impl.change_status(status::failed);
266             else
267                 impl.change_status(status::closed);
268             impl.close();
269 
270         upcall:
271             impl.wr_block.unlock(this);
272             impl.rd_block.try_unlock(this)
273                 && impl.op_r_rd.maybe_invoke();
274             impl.op_rd.maybe_invoke()
275                 || impl.op_idle_ping.maybe_invoke()
276                 || impl.op_ping.maybe_invoke()
277                 || impl.op_wr.maybe_invoke();
278             this->complete(cont, ec);
279         }
280     }
281 };
282 
283 template<class NextLayer, bool deflateSupported>
284 struct stream<NextLayer, deflateSupported>::
285     run_close_op
286 {
287     template<class CloseHandler>
288     void
operator ()boost::beast::websocket::stream::run_close_op289     operator()(
290         CloseHandler&& h,
291         boost::shared_ptr<impl_type> const& sp,
292         close_reason const& cr)
293     {
294         // If you get an error on the following line it means
295         // that your handler does not meet the documented type
296         // requirements for the handler.
297 
298         static_assert(
299             beast::detail::is_invocable<CloseHandler,
300                 void(error_code)>::value,
301             "CloseHandler type requirements not met");
302 
303         close_op<
304             typename std::decay<CloseHandler>::type>(
305                 std::forward<CloseHandler>(h),
306                 sp,
307                 cr);
308     }
309 };
310 
311 //------------------------------------------------------------------------------
312 
313 template<class NextLayer, bool deflateSupported>
314 void
315 stream<NextLayer, deflateSupported>::
close(close_reason const & cr)316 close(close_reason const& cr)
317 {
318     static_assert(is_sync_stream<next_layer_type>::value,
319         "SyncStream type requirements not met");
320     error_code ec;
321     close(cr, ec);
322     if(ec)
323         BOOST_THROW_EXCEPTION(system_error{ec});
324 }
325 
326 template<class NextLayer, bool deflateSupported>
327 void
328 stream<NextLayer, deflateSupported>::
close(close_reason const & cr,error_code & ec)329 close(close_reason const& cr, error_code& ec)
330 {
331     static_assert(is_sync_stream<next_layer_type>::value,
332         "SyncStream type requirements not met");
333     using beast::detail::clamp;
334     auto& impl = *impl_;
335     ec = {};
336     if(impl.check_stop_now(ec))
337         return;
338     BOOST_ASSERT(! impl.rd_close);
339 
340     // Can't call close twice
341     // TODO return a custom error code
342     BOOST_ASSERT(! impl.wr_close);
343 
344     // Send close frame
345     {
346         impl.wr_close = true;
347         impl.change_status(status::closing);
348         detail::frame_buffer fb;
349         impl.template write_close<flat_static_buffer_base>(fb, cr);
350         net::write(impl.stream(), fb.data(), ec);
351         if(impl.check_stop_now(ec))
352             return;
353     }
354 
355     // Read until a receiving a close frame
356     error_code ev;
357     if(impl.rd_remain > 0)
358         goto read_payload;
359     for(;;)
360     {
361         // Read frame header
362         while(! impl.parse_fh(
363             impl.rd_fh, impl.rd_buf, ev))
364         {
365             if(ev)
366             {
367                 // Protocol violation
368                 return do_fail(close_code::none, ev, ec);
369             }
370             impl.rd_buf.commit(impl.stream().read_some(
371                 impl.rd_buf.prepare(read_size(
372                     impl.rd_buf, impl.rd_buf.max_size())), ec));
373             if(impl.check_stop_now(ec))
374                 return;
375         }
376 
377         if(detail::is_control(impl.rd_fh.op))
378         {
379             // Discard ping/pong frame
380             if(impl.rd_fh.op != detail::opcode::close)
381             {
382                 impl.rd_buf.consume(clamp(impl.rd_fh.len));
383                 continue;
384             }
385 
386             // Handle close frame
387             // TODO Should we invoke the control callback?
388             BOOST_ASSERT(! impl.rd_close);
389             impl.rd_close = true;
390             auto const mb = buffers_prefix(
391                 clamp(impl.rd_fh.len),
392                 impl.rd_buf.data());
393             if(impl.rd_fh.len > 0 && impl.rd_fh.mask)
394                 detail::mask_inplace(mb, impl.rd_key);
395             detail::read_close(impl.cr, mb, ev);
396             if(ev)
397             {
398                 // Protocol violation
399                 return do_fail(close_code::none, ev, ec);
400             }
401             impl.rd_buf.consume(clamp(impl.rd_fh.len));
402             break;
403         }
404 
405     read_payload:
406         // Discard message frame
407         while(impl.rd_buf.size() < impl.rd_remain)
408         {
409             impl.rd_remain -= impl.rd_buf.size();
410             impl.rd_buf.consume(impl.rd_buf.size());
411             impl.rd_buf.commit(
412                 impl.stream().read_some(
413                     impl.rd_buf.prepare(
414                         read_size(
415                             impl.rd_buf,
416                             impl.rd_buf.max_size())),
417                     ec));
418             if(impl.check_stop_now(ec))
419                 return;
420         }
421         BOOST_ASSERT(
422             impl.rd_buf.size() >= impl.rd_remain);
423         impl.rd_buf.consume(clamp(impl.rd_remain));
424         impl.rd_remain = 0;
425     }
426     // _Close the WebSocket Connection_
427     do_fail(close_code::none, error::closed, ec);
428     if(ec == error::closed)
429         ec = {};
430 }
431 
432 template<class NextLayer, bool deflateSupported>
433 template<BOOST_BEAST_ASYNC_TPARAM1 CloseHandler>
BOOST_BEAST_ASYNC_RESULT1(CloseHandler)434 BOOST_BEAST_ASYNC_RESULT1(CloseHandler)
435 stream<NextLayer, deflateSupported>::
436 async_close(close_reason const& cr, CloseHandler&& handler)
437 {
438     static_assert(is_async_stream<next_layer_type>::value,
439         "AsyncStream type requirements not met");
440     return net::async_initiate<
441         CloseHandler,
442         void(error_code)>(
443             run_close_op{},
444             handler,
445             impl_,
446             cr);
447 }
448 
449 } // websocket
450 } // beast
451 } // boost
452 
453 #endif
454