1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 
11 #include "asio.hpp"
12 #include <algorithm>
13 #include <boost/bind.hpp>
14 #include <iostream>
15 #include <list>
16 #include "handler_allocator.hpp"
17 
18 class session
19 {
20 public:
session(asio::io_context & ioc,size_t block_size)21   session(asio::io_context& ioc, size_t block_size)
22     : io_context_(ioc),
23       strand_(ioc),
24       socket_(ioc),
25       block_size_(block_size),
26       read_data_(new char[block_size]),
27       read_data_length_(0),
28       write_data_(new char[block_size]),
29       unsent_count_(0),
30       op_count_(0)
31   {
32   }
33 
~session()34   ~session()
35   {
36     delete[] read_data_;
37     delete[] write_data_;
38   }
39 
socket()40   asio::ip::tcp::socket& socket()
41   {
42     return socket_;
43   }
44 
start()45   void start()
46   {
47     asio::error_code set_option_err;
48     asio::ip::tcp::no_delay no_delay(true);
49     socket_.set_option(no_delay, set_option_err);
50     if (!set_option_err)
51     {
52       ++op_count_;
53       socket_.async_read_some(asio::buffer(read_data_, block_size_),
54           asio::bind_executor(strand_,
55             make_custom_alloc_handler(read_allocator_,
56               boost::bind(&session::handle_read, this,
57                 asio::placeholders::error,
58                 asio::placeholders::bytes_transferred))));
59     }
60     else
61     {
62       asio::post(io_context_, boost::bind(&session::destroy, this));
63     }
64   }
65 
handle_read(const asio::error_code & err,size_t length)66   void handle_read(const asio::error_code& err, size_t length)
67   {
68     --op_count_;
69 
70     if (!err)
71     {
72       read_data_length_ = length;
73       ++unsent_count_;
74       if (unsent_count_ == 1)
75       {
76         op_count_ += 2;
77         std::swap(read_data_, write_data_);
78         async_write(socket_, asio::buffer(write_data_, read_data_length_),
79             asio::bind_executor(strand_,
80               make_custom_alloc_handler(write_allocator_,
81                 boost::bind(&session::handle_write, this,
82                   asio::placeholders::error))));
83         socket_.async_read_some(asio::buffer(read_data_, block_size_),
84             asio::bind_executor(strand_,
85               make_custom_alloc_handler(read_allocator_,
86                 boost::bind(&session::handle_read, this,
87                   asio::placeholders::error,
88                   asio::placeholders::bytes_transferred))));
89       }
90     }
91 
92     if (op_count_ == 0)
93       asio::post(io_context_, boost::bind(&session::destroy, this));
94   }
95 
handle_write(const asio::error_code & err)96   void handle_write(const asio::error_code& err)
97   {
98     --op_count_;
99 
100     if (!err)
101     {
102       --unsent_count_;
103       if (unsent_count_ == 1)
104       {
105         op_count_ += 2;
106         std::swap(read_data_, write_data_);
107         async_write(socket_, asio::buffer(write_data_, read_data_length_),
108             asio::bind_executor(strand_,
109               make_custom_alloc_handler(write_allocator_,
110                 boost::bind(&session::handle_write, this,
111                   asio::placeholders::error))));
112         socket_.async_read_some(asio::buffer(read_data_, block_size_),
113             asio::bind_executor(strand_,
114               make_custom_alloc_handler(read_allocator_,
115                 boost::bind(&session::handle_read, this,
116                   asio::placeholders::error,
117                   asio::placeholders::bytes_transferred))));
118       }
119     }
120 
121     if (op_count_ == 0)
122       asio::post(io_context_, boost::bind(&session::destroy, this));
123   }
124 
destroy(session * s)125   static void destroy(session* s)
126   {
127     delete s;
128   }
129 
130 private:
131   asio::io_context& io_context_;
132   asio::io_context::strand strand_;
133   asio::ip::tcp::socket socket_;
134   size_t block_size_;
135   char* read_data_;
136   size_t read_data_length_;
137   char* write_data_;
138   int unsent_count_;
139   int op_count_;
140   handler_allocator read_allocator_;
141   handler_allocator write_allocator_;
142 };
143 
144 class server
145 {
146 public:
server(asio::io_context & ioc,const asio::ip::tcp::endpoint & endpoint,size_t block_size)147   server(asio::io_context& ioc, const asio::ip::tcp::endpoint& endpoint,
148       size_t block_size)
149     : io_context_(ioc),
150       acceptor_(ioc),
151       block_size_(block_size)
152   {
153     acceptor_.open(endpoint.protocol());
154     acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(1));
155     acceptor_.bind(endpoint);
156     acceptor_.listen();
157 
158     start_accept();
159   }
160 
start_accept()161   void start_accept()
162   {
163     session* new_session = new session(io_context_, block_size_);
164     acceptor_.async_accept(new_session->socket(),
165         boost::bind(&server::handle_accept, this, new_session,
166           asio::placeholders::error));
167   }
168 
handle_accept(session * new_session,const asio::error_code & err)169   void handle_accept(session* new_session, const asio::error_code& err)
170   {
171     if (!err)
172     {
173       new_session->start();
174     }
175     else
176     {
177       delete new_session;
178     }
179 
180     start_accept();
181   }
182 
183 private:
184   asio::io_context& io_context_;
185   asio::ip::tcp::acceptor acceptor_;
186   size_t block_size_;
187 };
188 
main(int argc,char * argv[])189 int main(int argc, char* argv[])
190 {
191   try
192   {
193     if (argc != 5)
194     {
195       std::cerr << "Usage: server <address> <port> <threads> <blocksize>\n";
196       return 1;
197     }
198 
199     using namespace std; // For atoi.
200     asio::ip::address address = asio::ip::make_address(argv[1]);
201     short port = atoi(argv[2]);
202     int thread_count = atoi(argv[3]);
203     size_t block_size = atoi(argv[4]);
204 
205     asio::io_context ioc;
206 
207     server s(ioc, asio::ip::tcp::endpoint(address, port), block_size);
208 
209     // Threads not currently supported in this test.
210     std::list<asio::thread*> threads;
211     while (--thread_count > 0)
212     {
213       asio::thread* new_thread = new asio::thread(
214           boost::bind(&asio::io_context::run, &ioc));
215       threads.push_back(new_thread);
216     }
217 
218     ioc.run();
219 
220     while (!threads.empty())
221     {
222       threads.front()->join();
223       delete threads.front();
224       threads.pop_front();
225     }
226   }
227   catch (std::exception& e)
228   {
229     std::cerr << "Exception: " << e.what() << "\n";
230   }
231 
232   return 0;
233 }
234