1 #include <boost/asio/defer.hpp>
2 #include <boost/asio/executor.hpp>
3 #include <boost/asio/post.hpp>
4 #include <boost/asio/strand.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <condition_variable>
7 #include <deque>
8 #include <memory>
9 #include <mutex>
10 #include <typeinfo>
11 #include <vector>
12
13 using boost::asio::defer;
14 using boost::asio::executor;
15 using boost::asio::post;
16 using boost::asio::strand;
17 using boost::asio::system_executor;
18
19 //------------------------------------------------------------------------------
20 // A tiny actor framework
21 // ~~~~~~~~~~~~~~~~~~~~~~
22
23 class actor;
24
25 // Used to identify the sender and recipient of messages.
26 typedef actor* actor_address;
27
28 // Base class for all registered message handlers.
29 class message_handler_base
30 {
31 public:
~message_handler_base()32 virtual ~message_handler_base() {}
33
34 // Used to determine which message handlers receive an incoming message.
35 virtual const std::type_info& message_id() const = 0;
36 };
37
38 // Base class for a handler for a specific message type.
39 template <class Message>
40 class message_handler : public message_handler_base
41 {
42 public:
43 // Handle an incoming message.
44 virtual void handle_message(Message msg, actor_address from) = 0;
45 };
46
47 // Concrete message handler for a specific message type.
48 template <class Actor, class Message>
49 class mf_message_handler : public message_handler<Message>
50 {
51 public:
52 // Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message,actor_address),Actor * a)53 mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
54 : function_(mf), actor_(a)
55 {
56 }
57
58 // Used to determine which message handlers receive an incoming message.
message_id() const59 virtual const std::type_info& message_id() const
60 {
61 return typeid(Message);
62 }
63
64 // Handle an incoming message.
handle_message(Message msg,actor_address from)65 virtual void handle_message(Message msg, actor_address from)
66 {
67 (actor_->*function_)(std::move(msg), from);
68 }
69
70 // Determine whether the message handler represents the specified function.
is_function(void (Actor::* mf)(Message,actor_address)) const71 bool is_function(void (Actor::* mf)(Message, actor_address)) const
72 {
73 return mf == function_;
74 }
75
76 private:
77 void (Actor::* function_)(Message, actor_address);
78 Actor* actor_;
79 };
80
81 // Base class for all actors.
82 class actor
83 {
84 public:
~actor()85 virtual ~actor()
86 {
87 }
88
89 // Obtain the actor's address for use as a message sender or recipient.
address()90 actor_address address()
91 {
92 return this;
93 }
94
95 // Send a message from one actor to another.
96 template <class Message>
send(Message msg,actor_address from,actor_address to)97 friend void send(Message msg, actor_address from, actor_address to)
98 {
99 // Execute the message handler in the context of the target's executor.
100 post(to->executor_,
101 [=]
102 {
103 to->call_handler(std::move(msg), from);
104 });
105 }
106
107 protected:
108 // Construct the actor to use the specified executor for all message handlers.
actor(executor e)109 actor(executor e)
110 : executor_(std::move(e))
111 {
112 }
113
114 // Register a handler for a specific message type. Duplicates are permitted.
115 template <class Actor, class Message>
register_handler(void (Actor::* mf)(Message,actor_address))116 void register_handler(void (Actor::* mf)(Message, actor_address))
117 {
118 handlers_.push_back(
119 std::make_shared<mf_message_handler<Actor, Message>>(
120 mf, static_cast<Actor*>(this)));
121 }
122
123 // Deregister a handler. Removes only the first matching handler.
124 template <class Actor, class Message>
deregister_handler(void (Actor::* mf)(Message,actor_address))125 void deregister_handler(void (Actor::* mf)(Message, actor_address))
126 {
127 const std::type_info& id = typeid(message_handler<Message>);
128 for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
129 {
130 if ((*iter)->message_id() == id)
131 {
132 auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
133 if (mh->is_function(mf))
134 {
135 handlers_.erase(iter);
136 return;
137 }
138 }
139 }
140 }
141
142 // Send a message from within a message handler.
143 template <class Message>
tail_send(Message msg,actor_address to)144 void tail_send(Message msg, actor_address to)
145 {
146 // Execute the message handler in the context of the target's executor.
147 actor* from = this;
148 defer(to->executor_,
149 [=]
150 {
151 to->call_handler(std::move(msg), from);
152 });
153 }
154
155 private:
156 // Find the matching message handlers, if any, and call them.
157 template <class Message>
call_handler(Message msg,actor_address from)158 void call_handler(Message msg, actor_address from)
159 {
160 const std::type_info& message_id = typeid(Message);
161 for (auto& h: handlers_)
162 {
163 if (h->message_id() == message_id)
164 {
165 auto mh = static_cast<message_handler<Message>*>(h.get());
166 mh->handle_message(msg, from);
167 }
168 }
169 }
170
171 // All messages associated with a single actor object should be processed
172 // non-concurrently. We use a strand to ensure non-concurrent execution even
173 // if the underlying executor may use multiple threads.
174 strand<executor> executor_;
175
176 std::vector<std::shared_ptr<message_handler_base>> handlers_;
177 };
178
179 // A concrete actor that allows synchronous message retrieval.
180 template <class Message>
181 class receiver : public actor
182 {
183 public:
receiver()184 receiver()
185 : actor(system_executor())
186 {
187 register_handler(&receiver::message_handler);
188 }
189
190 // Block until a message has been received.
wait()191 Message wait()
192 {
193 std::unique_lock<std::mutex> lock(mutex_);
194 condition_.wait(lock, [this]{ return !message_queue_.empty(); });
195 Message msg(std::move(message_queue_.front()));
196 message_queue_.pop_front();
197 return msg;
198 }
199
200 private:
201 // Handle a new message by adding it to the queue and waking a waiter.
message_handler(Message msg,actor_address)202 void message_handler(Message msg, actor_address /* from */)
203 {
204 std::lock_guard<std::mutex> lock(mutex_);
205 message_queue_.push_back(std::move(msg));
206 condition_.notify_one();
207 }
208
209 std::mutex mutex_;
210 std::condition_variable condition_;
211 std::deque<Message> message_queue_;
212 };
213
214 //------------------------------------------------------------------------------
215
216 #include <boost/asio/thread_pool.hpp>
217 #include <iostream>
218
219 using boost::asio::thread_pool;
220
221 class member : public actor
222 {
223 public:
member(executor e)224 explicit member(executor e)
225 : actor(std::move(e))
226 {
227 register_handler(&member::init_handler);
228 }
229
230 private:
init_handler(actor_address next,actor_address from)231 void init_handler(actor_address next, actor_address from)
232 {
233 next_ = next;
234 caller_ = from;
235
236 register_handler(&member::token_handler);
237 deregister_handler(&member::init_handler);
238 }
239
token_handler(int token,actor_address)240 void token_handler(int token, actor_address /*from*/)
241 {
242 int msg(token);
243 actor_address to(caller_);
244
245 if (token > 0)
246 {
247 msg = token - 1;
248 to = next_;
249 }
250
251 tail_send(msg, to);
252 }
253
254 actor_address next_;
255 actor_address caller_;
256 };
257
main()258 int main()
259 {
260 const std::size_t num_threads = 16;
261 const int num_hops = 50000000;
262 const std::size_t num_actors = 503;
263 const int token_value = (num_hops + num_actors - 1) / num_actors;
264 const std::size_t actors_per_thread = num_actors / num_threads;
265
266 struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
267 single_thread_pool pools[num_threads];
268 std::vector<std::shared_ptr<member>> members(num_actors);
269 receiver<int> rcvr;
270
271 // Create the member actors.
272 for (std::size_t i = 0; i < num_actors; ++i)
273 members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
274
275 // Initialise the actors by passing each one the address of the next actor in the ring.
276 for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
277 send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
278
279 // Send exactly one token to each actor, all with the same initial value, rounding up if required.
280 for (std::size_t i = 0; i < num_actors; ++i)
281 send(token_value, rcvr.address(), members[i]->address());
282
283 // Wait for all signal messages, indicating the tokens have all reached zero.
284 for (std::size_t i = 0; i < num_actors; ++i)
285 rcvr.wait();
286 }
287