1 //
2 // composed_7.cpp
3 // ~~~~~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2020 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <boost/asio/compose.hpp>
12 #include <boost/asio/io_context.hpp>
13 #include <boost/asio/ip/tcp.hpp>
14 #include <boost/asio/steady_timer.hpp>
15 #include <boost/asio/use_future.hpp>
16 #include <boost/asio/write.hpp>
17 #include <functional>
18 #include <iostream>
19 #include <memory>
20 #include <sstream>
21 #include <string>
22 #include <type_traits>
23 #include <utility>
24 
25 using boost::asio::ip::tcp;
26 
27 // NOTE: This example requires the new boost::asio::async_compose function. For
28 // an example that works with the Networking TS style of completion tokens,
29 // please see an older version of asio.
30 
31 //------------------------------------------------------------------------------
32 
33 // This composed operation shows composition of multiple underlying operations.
34 // It automatically serialises a message, using its I/O streams insertion
35 // operator, before sending it N times on the socket. To do this, it must
36 // allocate a buffer for the encoded message and ensure this buffer's validity
37 // until all underlying async_write operation complete. A one second delay is
38 // inserted prior to each write operation, using a steady_timer.
39 
40 template <typename T, typename CompletionToken>
async_write_messages(tcp::socket & socket,const T & message,std::size_t repeat_count,CompletionToken && token)41 auto async_write_messages(tcp::socket& socket,
42     const T& message, std::size_t repeat_count,
43     CompletionToken&& token)
44   // The return type of the initiating function is deduced from the combination
45   // of CompletionToken type and the completion handler's signature. When the
46   // completion token is a simple callback, the return type is always void.
47   // In this example, when the completion token is boost::asio::yield_context
48   // (used for stackful coroutines) the return type would be also be void, as
49   // there is no non-error argument to the completion handler. When the
50   // completion token is boost::asio::use_future it would be std::future<void>.
51   //
52   // In C++14 we can omit the return type as it is automatically deduced from
53   // the return type of boost::asio::async_initiate.
54 {
55   // Encode the message and copy it into an allocated buffer. The buffer will
56   // be maintained for the lifetime of the composed asynchronous operation.
57   std::ostringstream os;
58   os << message;
59   std::unique_ptr<std::string> encoded_message(new std::string(os.str()));
60 
61   // Create a steady_timer to be used for the delay between messages.
62   std::unique_ptr<boost::asio::steady_timer> delay_timer(
63       new boost::asio::steady_timer(socket.get_executor()));
64 
65   // To manage the cycle between the multiple underlying asychronous
66   // operations, our implementation is a state machine.
67   enum { starting, waiting, writing };
68 
69   // The boost::asio::async_compose function takes:
70   //
71   // - our asynchronous operation implementation,
72   // - the completion token,
73   // - the completion handler signature, and
74   // - any I/O objects (or executors) used by the operation
75   //
76   // It then wraps our implementation, which is implemented here as a state
77   // machine in a lambda, in an intermediate completion handler that meets the
78   // requirements of a conforming asynchronous operation. This includes
79   // tracking outstanding work against the I/O executors associated with the
80   // operation (in this example, this is the socket's executor).
81   //
82   // The first argument to our lambda is a reference to the enclosing
83   // intermediate completion handler. This intermediate completion handler is
84   // provided for us by the boost::asio::async_compose function, and takes care
85   // of all the details required to implement a conforming asynchronous
86   // operation. When calling an underlying asynchronous operation, we pass it
87   // this enclosing intermediate completion handler as the completion token.
88   //
89   // All arguments to our lambda after the first must be defaulted to allow the
90   // state machine to be started, as well as to allow the completion handler to
91   // match the completion signature of both the async_write and
92   // steady_timer::async_wait operations.
93   return boost::asio::async_compose<
94     CompletionToken, void(boost::system::error_code)>(
95       [
96         // The implementation holds a reference to the socket as it is used for
97         // multiple async_write operations.
98         &socket,
99 
100         // The allocated buffer for the encoded message. The std::unique_ptr
101         // smart pointer is move-only, and as a consequence our lambda
102         // implementation is also move-only.
103         encoded_message = std::move(encoded_message),
104 
105         // The repeat count remaining.
106         repeat_count,
107 
108         // A steady timer used for introducing a delay.
109         delay_timer = std::move(delay_timer),
110 
111         // To manage the cycle between the multiple underlying asychronous
112         // operations, our implementation is a state machine.
113         state = starting
114       ]
115       (
116         auto& self,
117         const boost::system::error_code& error = {},
118         std::size_t = 0
119       ) mutable
120       {
121         if (!error)
122         {
123           switch (state)
124           {
125           case starting:
126           case writing:
127             if (repeat_count > 0)
128             {
129               --repeat_count;
130               state = waiting;
131               delay_timer->expires_after(std::chrono::seconds(1));
132               delay_timer->async_wait(std::move(self));
133               return; // Composed operation not yet complete.
134             }
135             break; // Composed operation complete, continue below.
136           case waiting:
137             state = writing;
138             boost::asio::async_write(socket,
139                 boost::asio::buffer(*encoded_message), std::move(self));
140             return; // Composed operation not yet complete.
141           }
142         }
143 
144         // This point is reached only on completion of the entire composed
145         // operation.
146 
147         // Deallocate the encoded message and delay timer before calling the
148         // user-supplied completion handler.
149         encoded_message.reset();
150         delay_timer.reset();
151 
152         // Call the user-supplied handler with the result of the operation.
153         self.complete(error);
154       },
155       token, socket);
156 }
157 
158 //------------------------------------------------------------------------------
159 
test_callback()160 void test_callback()
161 {
162   boost::asio::io_context io_context;
163 
164   tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
165   tcp::socket socket = acceptor.accept();
166 
167   // Test our asynchronous operation using a lambda as a callback.
168   async_write_messages(socket, "Testing callback\r\n", 5,
169       [](const boost::system::error_code& error)
170       {
171         if (!error)
172         {
173           std::cout << "Messages sent\n";
174         }
175         else
176         {
177           std::cout << "Error: " << error.message() << "\n";
178         }
179       });
180 
181   io_context.run();
182 }
183 
184 //------------------------------------------------------------------------------
185 
test_future()186 void test_future()
187 {
188   boost::asio::io_context io_context;
189 
190   tcp::acceptor acceptor(io_context, {tcp::v4(), 55555});
191   tcp::socket socket = acceptor.accept();
192 
193   // Test our asynchronous operation using the use_future completion token.
194   // This token causes the operation's initiating function to return a future,
195   // which may be used to synchronously wait for the result of the operation.
196   std::future<void> f = async_write_messages(
197       socket, "Testing future\r\n", 5, boost::asio::use_future);
198 
199   io_context.run();
200 
201   try
202   {
203     // Get the result of the operation.
204     f.get();
205     std::cout << "Messages sent\n";
206   }
207   catch (const std::exception& e)
208   {
209     std::cout << "Error: " << e.what() << "\n";
210   }
211 }
212 
213 //------------------------------------------------------------------------------
214 
main()215 int main()
216 {
217   test_callback();
218   test_future();
219 }
220