1 //
2 // server.cpp
3 // ~~~~~~~~~~
4 //
5 // Copyright (c) 2003-2016 Christopher M. Kohlhoff (chris at kohlhoff dot com)
6 //
7 // Distributed under the Boost Software License, Version 1.0. (See accompanying
8 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10
11 #include "asio.hpp"
12 #include <algorithm>
13 #include <boost/bind.hpp>
14 #include <iostream>
15 #include <list>
16 #include "handler_allocator.hpp"
17
18 class session
19 {
20 public:
session(asio::io_context & ioc,size_t block_size)21 session(asio::io_context& ioc, size_t block_size)
22 : io_context_(ioc),
23 strand_(ioc),
24 socket_(ioc),
25 block_size_(block_size),
26 read_data_(new char[block_size]),
27 read_data_length_(0),
28 write_data_(new char[block_size]),
29 unsent_count_(0),
30 op_count_(0)
31 {
32 }
33
~session()34 ~session()
35 {
36 delete[] read_data_;
37 delete[] write_data_;
38 }
39
socket()40 asio::ip::tcp::socket& socket()
41 {
42 return socket_;
43 }
44
start()45 void start()
46 {
47 asio::error_code set_option_err;
48 asio::ip::tcp::no_delay no_delay(true);
49 socket_.set_option(no_delay, set_option_err);
50 if (!set_option_err)
51 {
52 ++op_count_;
53 socket_.async_read_some(asio::buffer(read_data_, block_size_),
54 asio::bind_executor(strand_,
55 make_custom_alloc_handler(read_allocator_,
56 boost::bind(&session::handle_read, this,
57 asio::placeholders::error,
58 asio::placeholders::bytes_transferred))));
59 }
60 else
61 {
62 asio::post(io_context_, boost::bind(&session::destroy, this));
63 }
64 }
65
handle_read(const asio::error_code & err,size_t length)66 void handle_read(const asio::error_code& err, size_t length)
67 {
68 --op_count_;
69
70 if (!err)
71 {
72 read_data_length_ = length;
73 ++unsent_count_;
74 if (unsent_count_ == 1)
75 {
76 op_count_ += 2;
77 std::swap(read_data_, write_data_);
78 async_write(socket_, asio::buffer(write_data_, read_data_length_),
79 asio::bind_executor(strand_,
80 make_custom_alloc_handler(write_allocator_,
81 boost::bind(&session::handle_write, this,
82 asio::placeholders::error))));
83 socket_.async_read_some(asio::buffer(read_data_, block_size_),
84 asio::bind_executor(strand_,
85 make_custom_alloc_handler(read_allocator_,
86 boost::bind(&session::handle_read, this,
87 asio::placeholders::error,
88 asio::placeholders::bytes_transferred))));
89 }
90 }
91
92 if (op_count_ == 0)
93 asio::post(io_context_, boost::bind(&session::destroy, this));
94 }
95
handle_write(const asio::error_code & err)96 void handle_write(const asio::error_code& err)
97 {
98 --op_count_;
99
100 if (!err)
101 {
102 --unsent_count_;
103 if (unsent_count_ == 1)
104 {
105 op_count_ += 2;
106 std::swap(read_data_, write_data_);
107 async_write(socket_, asio::buffer(write_data_, read_data_length_),
108 asio::bind_executor(strand_,
109 make_custom_alloc_handler(write_allocator_,
110 boost::bind(&session::handle_write, this,
111 asio::placeholders::error))));
112 socket_.async_read_some(asio::buffer(read_data_, block_size_),
113 asio::bind_executor(strand_,
114 make_custom_alloc_handler(read_allocator_,
115 boost::bind(&session::handle_read, this,
116 asio::placeholders::error,
117 asio::placeholders::bytes_transferred))));
118 }
119 }
120
121 if (op_count_ == 0)
122 asio::post(io_context_, boost::bind(&session::destroy, this));
123 }
124
destroy(session * s)125 static void destroy(session* s)
126 {
127 delete s;
128 }
129
130 private:
131 asio::io_context& io_context_;
132 asio::io_context::strand strand_;
133 asio::ip::tcp::socket socket_;
134 size_t block_size_;
135 char* read_data_;
136 size_t read_data_length_;
137 char* write_data_;
138 int unsent_count_;
139 int op_count_;
140 handler_allocator read_allocator_;
141 handler_allocator write_allocator_;
142 };
143
144 class server
145 {
146 public:
server(asio::io_context & ioc,const asio::ip::tcp::endpoint & endpoint,size_t block_size)147 server(asio::io_context& ioc, const asio::ip::tcp::endpoint& endpoint,
148 size_t block_size)
149 : io_context_(ioc),
150 acceptor_(ioc),
151 block_size_(block_size)
152 {
153 acceptor_.open(endpoint.protocol());
154 acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(1));
155 acceptor_.bind(endpoint);
156 acceptor_.listen();
157
158 start_accept();
159 }
160
start_accept()161 void start_accept()
162 {
163 session* new_session = new session(io_context_, block_size_);
164 acceptor_.async_accept(new_session->socket(),
165 boost::bind(&server::handle_accept, this, new_session,
166 asio::placeholders::error));
167 }
168
handle_accept(session * new_session,const asio::error_code & err)169 void handle_accept(session* new_session, const asio::error_code& err)
170 {
171 if (!err)
172 {
173 new_session->start();
174 }
175 else
176 {
177 delete new_session;
178 }
179
180 start_accept();
181 }
182
183 private:
184 asio::io_context& io_context_;
185 asio::ip::tcp::acceptor acceptor_;
186 size_t block_size_;
187 };
188
main(int argc,char * argv[])189 int main(int argc, char* argv[])
190 {
191 try
192 {
193 if (argc != 5)
194 {
195 std::cerr << "Usage: server <address> <port> <threads> <blocksize>\n";
196 return 1;
197 }
198
199 using namespace std; // For atoi.
200 asio::ip::address address = asio::ip::make_address(argv[1]);
201 short port = atoi(argv[2]);
202 int thread_count = atoi(argv[3]);
203 size_t block_size = atoi(argv[4]);
204
205 asio::io_context ioc;
206
207 server s(ioc, asio::ip::tcp::endpoint(address, port), block_size);
208
209 // Threads not currently supported in this test.
210 std::list<asio::thread*> threads;
211 while (--thread_count > 0)
212 {
213 asio::thread* new_thread = new asio::thread(
214 boost::bind(&asio::io_context::run, &ioc));
215 threads.push_back(new_thread);
216 }
217
218 ioc.run();
219
220 while (!threads.empty())
221 {
222 threads.front()->join();
223 delete threads.front();
224 threads.pop_front();
225 }
226 }
227 catch (std::exception& e)
228 {
229 std::cerr << "Exception: " << e.what() << "\n";
230 }
231
232 return 0;
233 }
234