1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2018 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include <asio.hpp>
12 #include <boost/bind.hpp>
13 #include <boost/shared_ptr.hpp>
14 #include <cmath>
15 #include <cstdlib>
16 #include <exception>
17 #include <iostream>
18 #include <set>
19 #include "protocol.hpp"
20 
21 using asio::ip::tcp;
22 using asio::ip::udp;
23 
24 typedef boost::shared_ptr<tcp::socket> tcp_socket_ptr;
25 typedef boost::shared_ptr<asio::steady_timer> timer_ptr;
26 typedef boost::shared_ptr<control_request> control_request_ptr;
27 
28 class server
29 {
30 public:
31   // Construct the server to wait for incoming control connections.
server(asio::io_context & io_context,unsigned short port)32   server(asio::io_context& io_context, unsigned short port)
33     : acceptor_(io_context, tcp::endpoint(tcp::v4(), port)),
34       timer_(io_context),
35       udp_socket_(io_context, udp::endpoint(udp::v4(), 0)),
36       next_frame_number_(1)
37   {
38     // Start waiting for a new control connection.
39     tcp_socket_ptr new_socket(
40         new tcp::socket(acceptor_.get_executor().context()));
41     acceptor_.async_accept(*new_socket,
42         boost::bind(&server::handle_accept, this,
43           asio::placeholders::error, new_socket));
44 
45     // Start the timer used to generate outgoing frames.
46     timer_.expires_after(asio::chrono::milliseconds(100));
47     timer_.async_wait(boost::bind(&server::handle_timer, this));
48   }
49 
50   // Handle a new control connection.
handle_accept(const asio::error_code & ec,tcp_socket_ptr socket)51   void handle_accept(const asio::error_code& ec, tcp_socket_ptr socket)
52   {
53     if (!ec)
54     {
55       // Start receiving control requests on the connection.
56       control_request_ptr request(new control_request);
57       asio::async_read(*socket, request->to_buffers(),
58           boost::bind(&server::handle_control_request, this,
59             asio::placeholders::error, socket, request));
60     }
61 
62     // Start waiting for a new control connection.
63     tcp_socket_ptr new_socket(
64         new tcp::socket(acceptor_.get_executor().context()));
65     acceptor_.async_accept(*new_socket,
66         boost::bind(&server::handle_accept, this,
67           asio::placeholders::error, new_socket));
68   }
69 
70   // Handle a new control request.
handle_control_request(const asio::error_code & ec,tcp_socket_ptr socket,control_request_ptr request)71   void handle_control_request(const asio::error_code& ec,
72       tcp_socket_ptr socket, control_request_ptr request)
73   {
74     if (!ec)
75     {
76       // Delay handling of the control request to simulate network latency.
77       timer_ptr delay_timer(
78           new asio::steady_timer(acceptor_.get_executor().context()));
79       delay_timer->expires_after(asio::chrono::seconds(2));
80       delay_timer->async_wait(
81           boost::bind(&server::handle_control_request_timer, this,
82             socket, request, delay_timer));
83     }
84   }
85 
handle_control_request_timer(tcp_socket_ptr socket,control_request_ptr request,timer_ptr)86   void handle_control_request_timer(tcp_socket_ptr socket,
87       control_request_ptr request, timer_ptr /*delay_timer*/)
88   {
89     // Determine what address this client is connected from, since
90     // subscriptions must be stored on the server as a complete endpoint, not
91     // just a port. We use the non-throwing overload of remote_endpoint() since
92     // it may fail if the socket is no longer connected.
93     asio::error_code ec;
94     tcp::endpoint remote_endpoint = socket->remote_endpoint(ec);
95     if (!ec)
96     {
97       // Remove old port subscription, if any.
98       if (unsigned short old_port = request->old_port())
99       {
100         udp::endpoint old_endpoint(remote_endpoint.address(), old_port);
101         subscribers_.erase(old_endpoint);
102         std::cout << "Removing subscription " << old_endpoint << std::endl;
103       }
104 
105       // Add new port subscription, if any.
106       if (unsigned short new_port = request->new_port())
107       {
108         udp::endpoint new_endpoint(remote_endpoint.address(), new_port);
109         subscribers_.insert(new_endpoint);
110         std::cout << "Adding subscription " << new_endpoint << std::endl;
111       }
112     }
113 
114     // Wait for next control request on this connection.
115     asio::async_read(*socket, request->to_buffers(),
116         boost::bind(&server::handle_control_request, this,
117           asio::placeholders::error, socket, request));
118   }
119 
120   // Every time the timer fires we will generate a new frame and send it to all
121   // subscribers.
handle_timer()122   void handle_timer()
123   {
124     // Generate payload.
125     double x = next_frame_number_ * 0.2;
126     double y = std::sin(x);
127     int char_index = static_cast<int>((y + 1.0) * (frame::payload_size / 2));
128     std::string payload;
129     for (int i = 0; i < frame::payload_size; ++i)
130       payload += (i == char_index ? '*' : '.');
131 
132     // Create the frame to be sent to all subscribers.
133     frame f(next_frame_number_++, payload);
134 
135     // Send frame to all subscribers. We can use synchronous calls here since
136     // UDP send operations typically do not block.
137     std::set<udp::endpoint>::iterator j;
138     for (j = subscribers_.begin(); j != subscribers_.end(); ++j)
139     {
140       asio::error_code ec;
141       udp_socket_.send_to(f.to_buffers(), *j, 0, ec);
142     }
143 
144     // Wait for next timeout.
145     timer_.expires_after(asio::chrono::milliseconds(100));
146     timer_.async_wait(boost::bind(&server::handle_timer, this));
147   }
148 
149 private:
150   // The acceptor used to accept incoming control connections.
151   tcp::acceptor acceptor_;
152 
153   // The timer used for generating data.
154   asio::steady_timer timer_;
155 
156   // The socket used to send data to subscribers.
157   udp::socket udp_socket_;
158 
159   // The next frame number.
160   unsigned long next_frame_number_;
161 
162   // The set of endpoints that are subscribed.
163   std::set<udp::endpoint> subscribers_;
164 };
165 
main(int argc,char * argv[])166 int main(int argc, char* argv[])
167 {
168   try
169   {
170     if (argc != 2)
171     {
172       std::cerr << "Usage: server <port>\n";
173       return 1;
174     }
175 
176     asio::io_context io_context;
177 
178     using namespace std; // For atoi.
179     server s(io_context, atoi(argv[1]));
180 
181     io_context.run();
182   }
183   catch (std::exception& e)
184   {
185     std::cerr << "Exception: " << e.what() << std::endl;
186   }
187 
188   return 0;
189 }
190