1 //
2 // Copyright (c) 2016-2019 Vinnie Falco (vinnie dot falco at gmail dot com)
3 //
4 // Distributed under the Boost Software License, Version 1.0. (See accompanying
5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // Official repository: https://github.com/boostorg/beast
8 //
9 
10 #ifndef BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
11 #define BOOST_BEAST_CORE_IMPL_BASIC_STREAM_HPP
12 
13 #include <boost/beast/core/async_base.hpp>
14 #include <boost/beast/core/buffer_traits.hpp>
15 #include <boost/beast/core/buffers_prefix.hpp>
16 #include <boost/beast/websocket/teardown.hpp>
17 #include <boost/asio/coroutine.hpp>
18 #include <boost/assert.hpp>
19 #include <boost/make_shared.hpp>
20 #include <boost/core/exchange.hpp>
21 #include <cstdlib>
22 #include <type_traits>
23 #include <utility>
24 
25 namespace boost {
26 namespace beast {
27 
28 //------------------------------------------------------------------------------
29 
30 template<class Protocol, class Executor, class RatePolicy>
31 template<class... Args>
32 basic_stream<Protocol, Executor, RatePolicy>::
33 impl_type::
impl_type(std::false_type,Args &&...args)34 impl_type(std::false_type, Args&&... args)
35     : socket(std::forward<Args>(args)...)
36     , read(ex())
37     , write(ex())
38     , timer(ex())
39 {
40     reset();
41 }
42 
43 template<class Protocol, class Executor, class RatePolicy>
44 template<class RatePolicy_, class... Args>
45 basic_stream<Protocol, Executor, RatePolicy>::
46 impl_type::
impl_type(std::true_type,RatePolicy_ && policy,Args &&...args)47 impl_type(std::true_type,
48     RatePolicy_&& policy, Args&&... args)
49     : boost::empty_value<RatePolicy>(
50         boost::empty_init_t{},
51         std::forward<RatePolicy_>(policy))
52     , socket(std::forward<Args>(args)...)
53     , read(ex())
54     , write(ex())
55     , timer(ex())
56 {
57     reset();
58 }
59 
60 template<class Protocol, class Executor, class RatePolicy>
61 template<class Executor2>
62 void
63 basic_stream<Protocol, Executor, RatePolicy>::
64 impl_type::
on_timer(Executor2 const & ex2)65 on_timer(Executor2 const& ex2)
66 {
67     BOOST_ASSERT(waiting > 0);
68 
69     // the last waiter starts the new slice
70     if(--waiting > 0)
71         return;
72 
73     // update the expiration time
74     BOOST_VERIFY(timer.expires_after(
75         std::chrono::seconds(1)) == 0);
76 
77     rate_policy_access::on_timer(policy());
78 
79     struct handler : boost::empty_value<Executor2>
80     {
81         boost::weak_ptr<impl_type> wp;
82 
83         using executor_type = Executor2;
84 
85         executor_type
86         get_executor() const noexcept
87         {
88             return this->get();
89         }
90 
91         handler(
92             Executor2 const& ex2,
93             boost::shared_ptr<impl_type> const& sp)
94             : boost::empty_value<Executor2>(
95                 boost::empty_init_t{}, ex2)
96             , wp(sp)
97         {
98         }
99 
100         void
101         operator()(error_code ec)
102         {
103             auto sp = wp.lock();
104             if(! sp)
105                 return;
106             if(ec == net::error::operation_aborted)
107                 return;
108             BOOST_ASSERT(! ec);
109             if(ec)
110                 return;
111             sp->on_timer(this->get());
112         }
113     };
114 
115     // wait on the timer again
116     ++waiting;
117     timer.async_wait(handler(ex2, this->shared_from_this()));
118 }
119 
120 template<class Protocol, class Executor, class RatePolicy>
121 void
122 basic_stream<Protocol, Executor, RatePolicy>::
123 impl_type::
reset()124 reset()
125 {
126     // If assert goes off, it means that there are
127     // already read or write (or connect) operations
128     // outstanding, so there is nothing to apply
129     // the expiration time to!
130     //
131     BOOST_ASSERT(! read.pending || ! write.pending);
132 
133     if(! read.pending)
134         BOOST_VERIFY(
135             read.timer.expires_at(never()) == 0);
136 
137     if(! write.pending)
138         BOOST_VERIFY(
139             write.timer.expires_at(never()) == 0);
140 }
141 
142 template<class Protocol, class Executor, class RatePolicy>
143 void
144 basic_stream<Protocol, Executor, RatePolicy>::
145 impl_type::
close()146 close() noexcept
147 {
148     {
149         error_code ec;
150         socket.close(ec);
151     }
152     try
153     {
154         timer.cancel();
155     }
156     catch(...)
157     {
158     }
159 }
160 
161 //------------------------------------------------------------------------------
162 
163 template<class Protocol, class Executor, class RatePolicy>
164 template<class Executor2>
165 struct basic_stream<Protocol, Executor, RatePolicy>::
166     timeout_handler
167 {
168     using executor_type = Executor2;
169 
170     op_state& state;
171     boost::weak_ptr<impl_type> wp;
172     tick_type tick;
173     executor_type ex;
174 
get_executorboost::beast::basic_stream::timeout_handler175     executor_type get_executor() const noexcept
176     {
177         return ex;
178     }
179 
180     void
operator ()boost::beast::basic_stream::timeout_handler181     operator()(error_code ec)
182     {
183         // timer canceled
184         if(ec == net::error::operation_aborted)
185             return;
186         BOOST_ASSERT(! ec);
187 
188         auto sp = wp.lock();
189 
190         // stream destroyed
191         if(! sp)
192             return;
193 
194         // stale timer
195         if(tick < state.tick)
196             return;
197         BOOST_ASSERT(tick == state.tick);
198 
199         // timeout
200         BOOST_ASSERT(! state.timeout);
201         sp->close();
202         state.timeout = true;
203     }
204 };
205 
206 //------------------------------------------------------------------------------
207 
208 template<class Protocol, class Executor, class RatePolicy>
209 struct basic_stream<Protocol, Executor, RatePolicy>::ops
210 {
211 
212 template<bool isRead, class Buffers, class Handler>
213 class transfer_op
214     : public async_base<Handler, Executor>
215     , public boost::asio::coroutine
216 {
217     boost::shared_ptr<impl_type> impl_;
218     pending_guard pg_;
219     Buffers b_;
220 
221     using is_read = std::integral_constant<bool, isRead>;
222 
223     op_state&
state()224     state()
225     {
226         if (isRead)
227             return impl_->read;
228         else
229             return impl_->write;
230     }
231 
232     std::size_t
available_bytes()233     available_bytes()
234     {
235         if (isRead)
236             return rate_policy_access::
237                 available_read_bytes(impl_->policy());
238         else
239             return rate_policy_access::
240                 available_write_bytes(impl_->policy());
241     }
242 
243     void
transfer_bytes(std::size_t n)244     transfer_bytes(std::size_t n)
245     {
246         if (isRead)
247             rate_policy_access::
248                 transfer_read_bytes(impl_->policy(), n);
249         else
250             rate_policy_access::
251                 transfer_write_bytes(impl_->policy(), n);
252     }
253 
254     void
async_perform(std::size_t amount,std::true_type)255     async_perform(
256         std::size_t amount, std::true_type)
257     {
258         impl_->socket.async_read_some(
259             beast::buffers_prefix(amount, b_),
260                 std::move(*this));
261     }
262 
263     void
async_perform(std::size_t amount,std::false_type)264     async_perform(
265         std::size_t amount, std::false_type)
266     {
267         impl_->socket.async_write_some(
268             beast::buffers_prefix(amount, b_),
269                 std::move(*this));
270     }
271 
272     static bool never_pending_;
273 
274 public:
275     template<class Handler_>
transfer_op(Handler_ && h,basic_stream & s,Buffers const & b)276     transfer_op(
277         Handler_&& h,
278         basic_stream& s,
279         Buffers const& b)
280         : async_base<Handler, Executor>(
281             std::forward<Handler_>(h), s.get_executor())
282         , impl_(s.impl_)
283         , pg_()
284         , b_(b)
285     {
286         if (buffer_bytes(b_) == 0 && state().pending)
287         {
288             // Workaround:
289             // Corner case discovered in https://github.com/boostorg/beast/issues/2065
290             // Enclosing SSL stream wishes to complete a 0-length write early by
291             // executing a 0-length read against the underlying stream.
292             // This can occur even if an existing async_read is in progress.
293             // In this specific case, we will complete the async op with no error
294             // in order to prevent assertions and/or internal corruption of the basic_stream
295             this->complete(false, error_code(), 0);
296         }
297         else
298         {
299             pg_.assign(state().pending);
300             (*this)({});
301         }
302     }
303 
304     void
operator ()(error_code ec,std::size_t bytes_transferred=0)305     operator()(
306         error_code ec,
307         std::size_t bytes_transferred = 0)
308     {
309         BOOST_ASIO_CORO_REENTER(*this)
310         {
311             // handle empty buffers
312             if(detail::buffers_empty(b_))
313             {
314                 // make sure we perform the no-op
315                 BOOST_ASIO_CORO_YIELD
316                 {
317                     BOOST_ASIO_HANDLER_LOCATION((
318                         __FILE__, __LINE__,
319                         (isRead ? "basic_stream::async_read_some"
320                             : "basic_stream::async_write_some")));
321 
322                     async_perform(0, is_read{});
323                 }
324                 // apply the timeout manually, otherwise
325                 // behavior varies across platforms.
326                 if(state().timer.expiry() <= clock_type::now())
327                 {
328                     impl_->close();
329                     ec = beast::error::timeout;
330                 }
331                 goto upcall;
332             }
333 
334             // if a timeout is active, wait on the timer
335             if(state().timer.expiry() != never())
336             {
337                 BOOST_ASIO_HANDLER_LOCATION((
338                     __FILE__, __LINE__,
339                     (isRead ? "basic_stream::async_read_some"
340                         : "basic_stream::async_write_some")));
341 
342                 state().timer.async_wait(
343                     timeout_handler<decltype(this->get_executor())>{
344                         state(),
345                         impl_,
346                         state().tick,
347                         this->get_executor()});
348             }
349 
350             // check rate limit, maybe wait
351             std::size_t amount;
352             amount = available_bytes();
353             if(amount == 0)
354             {
355                 ++impl_->waiting;
356                 BOOST_ASIO_CORO_YIELD
357                 {
358                     BOOST_ASIO_HANDLER_LOCATION((
359                         __FILE__, __LINE__,
360                         (isRead ? "basic_stream::async_read_some"
361                             : "basic_stream::async_write_some")));
362 
363                     impl_->timer.async_wait(std::move(*this));
364                 }
365                 if(ec)
366                 {
367                     // socket was closed, or a timeout
368                     BOOST_ASSERT(ec ==
369                         net::error::operation_aborted);
370                     // timeout handler invoked?
371                     if(state().timeout)
372                     {
373                         // yes, socket already closed
374                         ec = beast::error::timeout;
375                         state().timeout = false;
376                     }
377                     goto upcall;
378                 }
379                 impl_->on_timer(this->get_executor());
380 
381                 // Allow at least one byte, otherwise
382                 // bytes_transferred could be 0.
383                 amount = std::max<std::size_t>(
384                     available_bytes(), 1);
385             }
386 
387             BOOST_ASIO_CORO_YIELD
388             {
389                 BOOST_ASIO_HANDLER_LOCATION((
390                     __FILE__, __LINE__,
391                     (isRead ? "basic_stream::async_read_some"
392                         : "basic_stream::async_write_some")));
393 
394                 async_perform(amount, is_read{});
395             }
396 
397             if(state().timer.expiry() != never())
398             {
399                 ++state().tick;
400 
401                 // try cancelling timer
402                 auto const n =
403                     state().timer.cancel();
404                 if(n == 0)
405                 {
406                     // timeout handler invoked?
407                     if(state().timeout)
408                     {
409                         // yes, socket already closed
410                         ec = beast::error::timeout;
411                         state().timeout = false;
412                     }
413                 }
414                 else
415                 {
416                     BOOST_ASSERT(n == 1);
417                     BOOST_ASSERT(! state().timeout);
418                 }
419             }
420 
421         upcall:
422             pg_.reset();
423             transfer_bytes(bytes_transferred);
424             this->complete_now(ec, bytes_transferred);
425         }
426     }
427 };
428 
429 template<class Handler>
430 class connect_op
431     : public async_base<Handler, Executor>
432 {
433     boost::shared_ptr<impl_type> impl_;
434     pending_guard pg0_;
435     pending_guard pg1_;
436 
437     op_state&
state()438     state() noexcept
439     {
440         return impl_->write;
441     }
442 
443 public:
444     template<class Handler_>
connect_op(Handler_ && h,basic_stream & s,endpoint_type ep)445     connect_op(
446         Handler_&& h,
447         basic_stream& s,
448         endpoint_type ep)
449         : async_base<Handler, Executor>(
450             std::forward<Handler_>(h), s.get_executor())
451         , impl_(s.impl_)
452         , pg0_(impl_->read.pending)
453         , pg1_(impl_->write.pending)
454     {
455         if(state().timer.expiry() != stream_base::never())
456         {
457             BOOST_ASIO_HANDLER_LOCATION((
458                 __FILE__, __LINE__,
459                 "basic_stream::async_connect"));
460 
461             impl_->write.timer.async_wait(
462                 timeout_handler<decltype(this->get_executor())>{
463                     state(),
464                     impl_,
465                     state().tick,
466                     this->get_executor()});
467         }
468 
469         BOOST_ASIO_HANDLER_LOCATION((
470             __FILE__, __LINE__,
471             "basic_stream::async_connect"));
472 
473         impl_->socket.async_connect(
474             ep, std::move(*this));
475         // *this is now moved-from
476     }
477 
478     template<
479         class Endpoints, class Condition,
480         class Handler_>
connect_op(Handler_ && h,basic_stream & s,Endpoints const & eps,Condition const & cond)481     connect_op(
482         Handler_&& h,
483         basic_stream& s,
484         Endpoints const& eps,
485         Condition const& cond)
486         : async_base<Handler, Executor>(
487             std::forward<Handler_>(h), s.get_executor())
488         , impl_(s.impl_)
489         , pg0_(impl_->read.pending)
490         , pg1_(impl_->write.pending)
491     {
492         if(state().timer.expiry() != stream_base::never())
493         {
494             BOOST_ASIO_HANDLER_LOCATION((
495                 __FILE__, __LINE__,
496                 "basic_stream::async_connect"));
497 
498             impl_->write.timer.async_wait(
499                 timeout_handler<decltype(this->get_executor())>{
500                     state(),
501                     impl_,
502                     state().tick,
503                     this->get_executor()});
504         }
505 
506         BOOST_ASIO_HANDLER_LOCATION((
507             __FILE__, __LINE__,
508             "basic_stream::async_connect"));
509 
510         net::async_connect(impl_->socket,
511             eps, cond, std::move(*this));
512         // *this is now moved-from
513     }
514 
515     template<
516         class Iterator, class Condition,
517         class Handler_>
connect_op(Handler_ && h,basic_stream & s,Iterator begin,Iterator end,Condition const & cond)518     connect_op(
519         Handler_&& h,
520         basic_stream& s,
521         Iterator begin, Iterator end,
522         Condition const& cond)
523         : async_base<Handler, Executor>(
524             std::forward<Handler_>(h), s.get_executor())
525         , impl_(s.impl_)
526         , pg0_(impl_->read.pending)
527         , pg1_(impl_->write.pending)
528     {
529         if(state().timer.expiry() != stream_base::never())
530         {
531             BOOST_ASIO_HANDLER_LOCATION((
532                 __FILE__, __LINE__,
533                 "basic_stream::async_connect"));
534 
535             impl_->write.timer.async_wait(
536                 timeout_handler<decltype(this->get_executor())>{
537                     state(),
538                     impl_,
539                     state().tick,
540                     this->get_executor()});
541         }
542 
543         BOOST_ASIO_HANDLER_LOCATION((
544             __FILE__, __LINE__,
545             "basic_stream::async_connect"));
546 
547         net::async_connect(impl_->socket,
548             begin, end, cond, std::move(*this));
549         // *this is now moved-from
550     }
551 
552     template<class... Args>
553     void
operator ()(error_code ec,Args &&...args)554     operator()(error_code ec, Args&&... args)
555     {
556         if(state().timer.expiry() != stream_base::never())
557         {
558             ++state().tick;
559 
560             // try cancelling timer
561             auto const n =
562                 impl_->write.timer.cancel();
563             if(n == 0)
564             {
565                 // timeout handler invoked?
566                 if(state().timeout)
567                 {
568                     // yes, socket already closed
569                     ec = beast::error::timeout;
570                     state().timeout = false;
571                 }
572             }
573             else
574             {
575                 BOOST_ASSERT(n == 1);
576                 BOOST_ASSERT(! state().timeout);
577             }
578         }
579 
580         pg0_.reset();
581         pg1_.reset();
582         this->complete_now(ec, std::forward<Args>(args)...);
583     }
584 };
585 
586 struct run_read_op
587 {
588     template<class ReadHandler, class Buffers>
589     void
operator ()boost::beast::basic_stream::ops::run_read_op590     operator()(
591         ReadHandler&& h,
592         basic_stream* s,
593         Buffers const& b)
594     {
595         // If you get an error on the following line it means
596         // that your handler does not meet the documented type
597         // requirements for the handler.
598 
599         static_assert(
600             detail::is_invocable<ReadHandler,
601                 void(error_code, std::size_t)>::value,
602             "ReadHandler type requirements not met");
603 
604         transfer_op<
605             true,
606             Buffers,
607             typename std::decay<ReadHandler>::type>(
608                 std::forward<ReadHandler>(h), *s, b);
609     }
610 };
611 
612 struct run_write_op
613 {
614     template<class WriteHandler, class Buffers>
615     void
operator ()boost::beast::basic_stream::ops::run_write_op616     operator()(
617         WriteHandler&& h,
618         basic_stream* s,
619         Buffers const& b)
620     {
621         // If you get an error on the following line it means
622         // that your handler does not meet the documented type
623         // requirements for the handler.
624 
625         static_assert(
626             detail::is_invocable<WriteHandler,
627                 void(error_code, std::size_t)>::value,
628             "WriteHandler type requirements not met");
629 
630         transfer_op<
631             false,
632             Buffers,
633             typename std::decay<WriteHandler>::type>(
634                 std::forward<WriteHandler>(h), *s, b);
635     }
636 };
637 
638 struct run_connect_op
639 {
640     template<class ConnectHandler>
641     void
operator ()boost::beast::basic_stream::ops::run_connect_op642     operator()(
643         ConnectHandler&& h,
644         basic_stream* s,
645         endpoint_type const& ep)
646     {
647         // If you get an error on the following line it means
648         // that your handler does not meet the documented type
649         // requirements for the handler.
650 
651         static_assert(
652             detail::is_invocable<ConnectHandler,
653                 void(error_code)>::value,
654             "ConnectHandler type requirements not met");
655 
656         connect_op<typename std::decay<ConnectHandler>::type>(
657             std::forward<ConnectHandler>(h), *s, ep);
658     }
659 };
660 
661 struct run_connect_range_op
662 {
663     template<
664         class RangeConnectHandler,
665         class EndpointSequence,
666         class Condition>
667     void
operator ()boost::beast::basic_stream::ops::run_connect_range_op668     operator()(
669         RangeConnectHandler&& h,
670         basic_stream* s,
671         EndpointSequence const& eps,
672         Condition const& cond)
673     {
674         // If you get an error on the following line it means
675         // that your handler does not meet the documented type
676         // requirements for the handler.
677 
678         static_assert(
679             detail::is_invocable<RangeConnectHandler,
680                 void(error_code, typename Protocol::endpoint)>::value,
681             "RangeConnectHandler type requirements not met");
682 
683         connect_op<typename std::decay<RangeConnectHandler>::type>(
684             std::forward<RangeConnectHandler>(h), *s, eps, cond);
685     }
686 };
687 
688 struct run_connect_iter_op
689 {
690     template<
691         class IteratorConnectHandler,
692         class Iterator,
693         class Condition>
694     void
operator ()boost::beast::basic_stream::ops::run_connect_iter_op695     operator()(
696         IteratorConnectHandler&& h,
697         basic_stream* s,
698         Iterator begin, Iterator end,
699         Condition const& cond)
700     {
701         // If you get an error on the following line it means
702         // that your handler does not meet the documented type
703         // requirements for the handler.
704 
705         static_assert(
706             detail::is_invocable<IteratorConnectHandler,
707                 void(error_code, Iterator)>::value,
708             "IteratorConnectHandler type requirements not met");
709 
710         connect_op<typename std::decay<IteratorConnectHandler>::type>(
711             std::forward<IteratorConnectHandler>(h), *s, begin, end, cond);
712     }
713 };
714 
715 };
716 
717 //------------------------------------------------------------------------------
718 
719 template<class Protocol, class Executor, class RatePolicy>
720 basic_stream<Protocol, Executor, RatePolicy>::
~basic_stream()721 ~basic_stream()
722 {
723     // the shared object can outlive *this,
724     // cancel any operations so the shared
725     // object is destroyed as soon as possible.
726     impl_->close();
727 }
728 
729 template<class Protocol, class Executor, class RatePolicy>
730 template<class Arg0, class... Args, class>
731 basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(Arg0 && arg0,Args &&...args)732 basic_stream(Arg0&& arg0, Args&&... args)
733     : impl_(boost::make_shared<impl_type>(
734         std::false_type{},
735         std::forward<Arg0>(arg0),
736         std::forward<Args>(args)...))
737 {
738 }
739 
740 template<class Protocol, class Executor, class RatePolicy>
741 template<class RatePolicy_, class Arg0, class... Args, class>
742 basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(RatePolicy_ && policy,Arg0 && arg0,Args &&...args)743 basic_stream(
744     RatePolicy_&& policy, Arg0&& arg0, Args&&... args)
745     : impl_(boost::make_shared<impl_type>(
746         std::true_type{},
747         std::forward<RatePolicy_>(policy),
748         std::forward<Arg0>(arg0),
749         std::forward<Args>(args)...))
750 {
751 }
752 
753 template<class Protocol, class Executor, class RatePolicy>
754 basic_stream<Protocol, Executor, RatePolicy>::
basic_stream(basic_stream && other)755 basic_stream(basic_stream&& other)
756     : impl_(boost::make_shared<impl_type>(
757         std::move(*other.impl_)))
758 {
759     // Explainer: Asio's sockets provide the guarantee that a moved-from socket
760     // will be in a state as-if newly created. i.e.:
761     // * having the same (valid) executor
762     // * the socket shall not be open
763     // We provide the same guarantee by moving the impl rather than the pointer
764     // controlling its lifetime.
765 }
766 
767 //------------------------------------------------------------------------------
768 
769 template<class Protocol, class Executor, class RatePolicy>
770 auto
771 basic_stream<Protocol, Executor, RatePolicy>::
release_socket()772 release_socket() ->
773     socket_type
774 {
775     this->cancel();
776     return std::move(impl_->socket);
777 }
778 
779 template<class Protocol, class Executor, class RatePolicy>
780 void
781 basic_stream<Protocol, Executor, RatePolicy>::
expires_after(net::steady_timer::duration expiry_time)782 expires_after(net::steady_timer::duration expiry_time)
783 {
784     // If assert goes off, it means that there are
785     // already read or write (or connect) operations
786     // outstanding, so there is nothing to apply
787     // the expiration time to!
788     //
789     BOOST_ASSERT(
790         ! impl_->read.pending ||
791         ! impl_->write.pending);
792 
793     if(! impl_->read.pending)
794         BOOST_VERIFY(
795             impl_->read.timer.expires_after(
796                 expiry_time) == 0);
797 
798     if(! impl_->write.pending)
799         BOOST_VERIFY(
800             impl_->write.timer.expires_after(
801                 expiry_time) == 0);
802 }
803 
804 template<class Protocol, class Executor, class RatePolicy>
805 void
806 basic_stream<Protocol, Executor, RatePolicy>::
expires_at(net::steady_timer::time_point expiry_time)807 expires_at(
808     net::steady_timer::time_point expiry_time)
809 {
810     // If assert goes off, it means that there are
811     // already read or write (or connect) operations
812     // outstanding, so there is nothing to apply
813     // the expiration time to!
814     //
815     BOOST_ASSERT(
816         ! impl_->read.pending ||
817         ! impl_->write.pending);
818 
819     if(! impl_->read.pending)
820         BOOST_VERIFY(
821             impl_->read.timer.expires_at(
822                 expiry_time) == 0);
823 
824     if(! impl_->write.pending)
825         BOOST_VERIFY(
826             impl_->write.timer.expires_at(
827                 expiry_time) == 0);
828 }
829 
830 template<class Protocol, class Executor, class RatePolicy>
831 void
832 basic_stream<Protocol, Executor, RatePolicy>::
expires_never()833 expires_never()
834 {
835     impl_->reset();
836 }
837 
838 template<class Protocol, class Executor, class RatePolicy>
839 void
840 basic_stream<Protocol, Executor, RatePolicy>::
cancel()841 cancel()
842 {
843     error_code ec;
844     impl_->socket.cancel(ec);
845     impl_->timer.cancel();
846 }
847 
848 template<class Protocol, class Executor, class RatePolicy>
849 void
850 basic_stream<Protocol, Executor, RatePolicy>::
close()851 close()
852 {
853     impl_->close();
854 }
855 
856 //------------------------------------------------------------------------------
857 
858 template<class Protocol, class Executor, class RatePolicy>
859 template<BOOST_BEAST_ASYNC_TPARAM1 ConnectHandler>
BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)860 BOOST_BEAST_ASYNC_RESULT1(ConnectHandler)
861 basic_stream<Protocol, Executor, RatePolicy>::
862 async_connect(
863     endpoint_type const& ep,
864     ConnectHandler&& handler)
865 {
866     return net::async_initiate<
867         ConnectHandler,
868         void(error_code)>(
869             typename ops::run_connect_op{},
870             handler,
871             this,
872             ep);
873 }
874 
875 template<class Protocol, class Executor, class RatePolicy>
876 template<
877     class EndpointSequence,
878     BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
879     class>
BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code,typename Protocol::endpoint))880 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void(error_code, typename Protocol::endpoint))
881 basic_stream<Protocol, Executor, RatePolicy>::
882 async_connect(
883     EndpointSequence const& endpoints,
884     RangeConnectHandler&& handler)
885 {
886     return net::async_initiate<
887         RangeConnectHandler,
888         void(error_code, typename Protocol::endpoint)>(
889             typename ops::run_connect_range_op{},
890             handler,
891             this,
892             endpoints,
893             detail::any_endpoint{});
894 }
895 
896 template<class Protocol, class Executor, class RatePolicy>
897 template<
898     class EndpointSequence,
899     class ConnectCondition,
900     BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, typename Protocol::endpoint)) RangeConnectHandler,
901     class>
BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code,typename Protocol::endpoint))902 BOOST_ASIO_INITFN_RESULT_TYPE(RangeConnectHandler,void (error_code, typename Protocol::endpoint))
903 basic_stream<Protocol, Executor, RatePolicy>::
904 async_connect(
905     EndpointSequence const& endpoints,
906     ConnectCondition connect_condition,
907     RangeConnectHandler&& handler)
908 {
909     return net::async_initiate<
910         RangeConnectHandler,
911         void(error_code, typename Protocol::endpoint)>(
912             typename ops::run_connect_range_op{},
913             handler,
914             this,
915             endpoints,
916             connect_condition);
917 }
918 
919 template<class Protocol, class Executor, class RatePolicy>
920 template<
921     class Iterator,
922     BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code,Iterator))923 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
924 basic_stream<Protocol, Executor, RatePolicy>::
925 async_connect(
926     Iterator begin, Iterator end,
927     IteratorConnectHandler&& handler)
928 {
929     return net::async_initiate<
930         IteratorConnectHandler,
931         void(error_code, Iterator)>(
932             typename ops::run_connect_iter_op{},
933             handler,
934             this,
935             begin, end,
936             detail::any_endpoint{});
937 }
938 
939 template<class Protocol, class Executor, class RatePolicy>
940 template<
941     class Iterator,
942     class ConnectCondition,
943     BOOST_ASIO_COMPLETION_TOKEN_FOR(void(error_code, Iterator)) IteratorConnectHandler>
BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code,Iterator))944 BOOST_ASIO_INITFN_RESULT_TYPE(IteratorConnectHandler,void (error_code, Iterator))
945 basic_stream<Protocol, Executor, RatePolicy>::
946 async_connect(
947     Iterator begin, Iterator end,
948     ConnectCondition connect_condition,
949     IteratorConnectHandler&& handler)
950 {
951     return net::async_initiate<
952         IteratorConnectHandler,
953         void(error_code, Iterator)>(
954             typename ops::run_connect_iter_op{},
955             handler,
956             this,
957             begin, end,
958             connect_condition);
959 }
960 
961 //------------------------------------------------------------------------------
962 
963 template<class Protocol, class Executor, class RatePolicy>
964 template<class MutableBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 ReadHandler>
BOOST_BEAST_ASYNC_RESULT2(ReadHandler)965 BOOST_BEAST_ASYNC_RESULT2(ReadHandler)
966 basic_stream<Protocol, Executor, RatePolicy>::
967 async_read_some(
968     MutableBufferSequence const& buffers,
969     ReadHandler&& handler)
970 {
971     static_assert(net::is_mutable_buffer_sequence<
972         MutableBufferSequence>::value,
973         "MutableBufferSequence type requirements not met");
974     return net::async_initiate<
975         ReadHandler,
976         void(error_code, std::size_t)>(
977             typename ops::run_read_op{},
978             handler,
979             this,
980             buffers);
981 }
982 
983 template<class Protocol, class Executor, class RatePolicy>
984 template<class ConstBufferSequence, BOOST_BEAST_ASYNC_TPARAM2 WriteHandler>
BOOST_BEAST_ASYNC_RESULT2(WriteHandler)985 BOOST_BEAST_ASYNC_RESULT2(WriteHandler)
986 basic_stream<Protocol, Executor, RatePolicy>::
987 async_write_some(
988     ConstBufferSequence const& buffers,
989     WriteHandler&& handler)
990 {
991     static_assert(net::is_const_buffer_sequence<
992         ConstBufferSequence>::value,
993         "ConstBufferSequence type requirements not met");
994     return net::async_initiate<
995         WriteHandler,
996         void(error_code, std::size_t)>(
997             typename ops::run_write_op{},
998             handler,
999             this,
1000             buffers);
1001 }
1002 
1003 //------------------------------------------------------------------------------
1004 //
1005 // Customization points
1006 //
1007 
1008 #if ! BOOST_BEAST_DOXYGEN
1009 
1010 template<
1011     class Protocol, class Executor, class RatePolicy>
1012 void
beast_close_socket(basic_stream<Protocol,Executor,RatePolicy> & stream)1013 beast_close_socket(
1014     basic_stream<Protocol, Executor, RatePolicy>& stream)
1015 {
1016     error_code ec;
1017     stream.socket().close(ec);
1018 }
1019 
1020 template<
1021     class Protocol, class Executor, class RatePolicy>
1022 void
teardown(role_type role,basic_stream<Protocol,Executor,RatePolicy> & stream,error_code & ec)1023 teardown(
1024     role_type role,
1025     basic_stream<Protocol, Executor, RatePolicy>& stream,
1026     error_code& ec)
1027 {
1028     using beast::websocket::teardown;
1029     teardown(role, stream.socket(), ec);
1030 }
1031 
1032 template<
1033     class Protocol, class Executor, class RatePolicy,
1034     class TeardownHandler>
1035 void
async_teardown(role_type role,basic_stream<Protocol,Executor,RatePolicy> & stream,TeardownHandler && handler)1036 async_teardown(
1037     role_type role,
1038     basic_stream<Protocol, Executor, RatePolicy>& stream,
1039     TeardownHandler&& handler)
1040 {
1041     using beast::websocket::async_teardown;
1042     async_teardown(role, stream.socket(),
1043         std::forward<TeardownHandler>(handler));
1044 }
1045 
1046 #endif
1047 
1048 } // beast
1049 } // boost
1050 
1051 #endif
1052