1 #include <boost/asio/defer.hpp>
2 #include <boost/asio/executor.hpp>
3 #include <boost/asio/post.hpp>
4 #include <boost/asio/strand.hpp>
5 #include <boost/asio/system_executor.hpp>
6 #include <condition_variable>
7 #include <deque>
8 #include <memory>
9 #include <mutex>
10 #include <typeinfo>
11 #include <vector>
12 
13 using boost::asio::defer;
14 using boost::asio::executor;
15 using boost::asio::post;
16 using boost::asio::strand;
17 using boost::asio::system_executor;
18 
19 //------------------------------------------------------------------------------
20 // A tiny actor framework
21 // ~~~~~~~~~~~~~~~~~~~~~~
22 
23 class actor;
24 
25 // Used to identify the sender and recipient of messages.
26 typedef actor* actor_address;
27 
28 // Base class for all registered message handlers.
29 class message_handler_base
30 {
31 public:
~message_handler_base()32   virtual ~message_handler_base() {}
33 
34   // Used to determine which message handlers receive an incoming message.
35   virtual const std::type_info& message_id() const = 0;
36 };
37 
38 // Base class for a handler for a specific message type.
39 template <class Message>
40 class message_handler : public message_handler_base
41 {
42 public:
43   // Handle an incoming message.
44   virtual void handle_message(Message msg, actor_address from) = 0;
45 };
46 
47 // Concrete message handler for a specific message type.
48 template <class Actor, class Message>
49 class mf_message_handler : public message_handler<Message>
50 {
51 public:
52   // Construct a message handler to invoke the specified member function.
mf_message_handler(void (Actor::* mf)(Message,actor_address),Actor * a)53   mf_message_handler(void (Actor::* mf)(Message, actor_address), Actor* a)
54     : function_(mf), actor_(a)
55   {
56   }
57 
58   // Used to determine which message handlers receive an incoming message.
message_id() const59   virtual const std::type_info& message_id() const
60   {
61     return typeid(Message);
62   }
63 
64   // Handle an incoming message.
handle_message(Message msg,actor_address from)65   virtual void handle_message(Message msg, actor_address from)
66   {
67     (actor_->*function_)(std::move(msg), from);
68   }
69 
70   // Determine whether the message handler represents the specified function.
is_function(void (Actor::* mf)(Message,actor_address)) const71   bool is_function(void (Actor::* mf)(Message, actor_address)) const
72   {
73     return mf == function_;
74   }
75 
76 private:
77   void (Actor::* function_)(Message, actor_address);
78   Actor* actor_;
79 };
80 
81 // Base class for all actors.
82 class actor
83 {
84 public:
~actor()85   virtual ~actor()
86   {
87   }
88 
89   // Obtain the actor's address for use as a message sender or recipient.
address()90   actor_address address()
91   {
92     return this;
93   }
94 
95   // Send a message from one actor to another.
96   template <class Message>
send(Message msg,actor_address from,actor_address to)97   friend void send(Message msg, actor_address from, actor_address to)
98   {
99     // Execute the message handler in the context of the target's executor.
100     post(to->executor_,
101       [=]
102       {
103         to->call_handler(std::move(msg), from);
104       });
105   }
106 
107 protected:
108   // Construct the actor to use the specified executor for all message handlers.
actor(executor e)109   actor(executor e)
110     : executor_(std::move(e))
111   {
112   }
113 
114   // Register a handler for a specific message type. Duplicates are permitted.
115   template <class Actor, class Message>
register_handler(void (Actor::* mf)(Message,actor_address))116   void register_handler(void (Actor::* mf)(Message, actor_address))
117   {
118     handlers_.push_back(
119       std::make_shared<mf_message_handler<Actor, Message>>(
120         mf, static_cast<Actor*>(this)));
121   }
122 
123   // Deregister a handler. Removes only the first matching handler.
124   template <class Actor, class Message>
deregister_handler(void (Actor::* mf)(Message,actor_address))125   void deregister_handler(void (Actor::* mf)(Message, actor_address))
126   {
127     const std::type_info& id = typeid(message_handler<Message>);
128     for (auto iter = handlers_.begin(); iter != handlers_.end(); ++iter)
129     {
130       if ((*iter)->message_id() == id)
131       {
132         auto mh = static_cast<mf_message_handler<Actor, Message>*>(iter->get());
133         if (mh->is_function(mf))
134         {
135           handlers_.erase(iter);
136           return;
137         }
138       }
139     }
140   }
141 
142   // Send a message from within a message handler.
143   template <class Message>
tail_send(Message msg,actor_address to)144   void tail_send(Message msg, actor_address to)
145   {
146     // Execute the message handler in the context of the target's executor.
147     actor* from = this;
148     defer(to->executor_,
149       [=]
150       {
151         to->call_handler(std::move(msg), from);
152       });
153   }
154 
155 private:
156   // Find the matching message handlers, if any, and call them.
157   template <class Message>
call_handler(Message msg,actor_address from)158   void call_handler(Message msg, actor_address from)
159   {
160     const std::type_info& message_id = typeid(Message);
161     for (auto& h: handlers_)
162     {
163       if (h->message_id() == message_id)
164       {
165         auto mh = static_cast<message_handler<Message>*>(h.get());
166         mh->handle_message(msg, from);
167       }
168     }
169   }
170 
171   // All messages associated with a single actor object should be processed
172   // non-concurrently. We use a strand to ensure non-concurrent execution even
173   // if the underlying executor may use multiple threads.
174   strand<executor> executor_;
175 
176   std::vector<std::shared_ptr<message_handler_base>> handlers_;
177 };
178 
179 // A concrete actor that allows synchronous message retrieval.
180 template <class Message>
181 class receiver : public actor
182 {
183 public:
receiver()184   receiver()
185     : actor(system_executor())
186   {
187     register_handler(&receiver::message_handler);
188   }
189 
190   // Block until a message has been received.
wait()191   Message wait()
192   {
193     std::unique_lock<std::mutex> lock(mutex_);
194     condition_.wait(lock, [this]{ return !message_queue_.empty(); });
195     Message msg(std::move(message_queue_.front()));
196     message_queue_.pop_front();
197     return msg;
198   }
199 
200 private:
201   // Handle a new message by adding it to the queue and waking a waiter.
message_handler(Message msg,actor_address)202   void message_handler(Message msg, actor_address /* from */)
203   {
204     std::lock_guard<std::mutex> lock(mutex_);
205     message_queue_.push_back(std::move(msg));
206     condition_.notify_one();
207   }
208 
209   std::mutex mutex_;
210   std::condition_variable condition_;
211   std::deque<Message> message_queue_;
212 };
213 
214 //------------------------------------------------------------------------------
215 
216 #include <boost/asio/thread_pool.hpp>
217 #include <iostream>
218 
219 using boost::asio::thread_pool;
220 
221 class member : public actor
222 {
223 public:
member(executor e)224   explicit member(executor e)
225     : actor(std::move(e))
226   {
227     register_handler(&member::init_handler);
228   }
229 
230 private:
init_handler(actor_address next,actor_address from)231   void init_handler(actor_address next, actor_address from)
232   {
233     next_ = next;
234     caller_ = from;
235 
236     register_handler(&member::token_handler);
237     deregister_handler(&member::init_handler);
238   }
239 
token_handler(int token,actor_address)240   void token_handler(int token, actor_address /*from*/)
241   {
242     int msg(token);
243     actor_address to(caller_);
244 
245     if (token > 0)
246     {
247       msg = token - 1;
248       to = next_;
249     }
250 
251     tail_send(msg, to);
252   }
253 
254   actor_address next_;
255   actor_address caller_;
256 };
257 
main()258 int main()
259 {
260   const std::size_t num_threads = 16;
261   const int num_hops = 50000000;
262   const std::size_t num_actors = 503;
263   const int token_value = (num_hops + num_actors - 1) / num_actors;
264   const std::size_t actors_per_thread = num_actors / num_threads;
265 
266   struct single_thread_pool : thread_pool { single_thread_pool() : thread_pool(1) {} };
267   single_thread_pool pools[num_threads];
268   std::vector<std::shared_ptr<member>> members(num_actors);
269   receiver<int> rcvr;
270 
271   // Create the member actors.
272   for (std::size_t i = 0; i < num_actors; ++i)
273     members[i] = std::make_shared<member>(pools[(i / actors_per_thread) % num_threads].get_executor());
274 
275   // Initialise the actors by passing each one the address of the next actor in the ring.
276   for (std::size_t i = num_actors, next_i = 0; i > 0; next_i = --i)
277     send(members[next_i]->address(), rcvr.address(), members[i - 1]->address());
278 
279   // Send exactly one token to each actor, all with the same initial value, rounding up if required.
280   for (std::size_t i = 0; i < num_actors; ++i)
281     send(token_value, rcvr.address(), members[i]->address());
282 
283   // Wait for all signal messages, indicating the tokens have all reached zero.
284   for (std::size_t i = 0; i < num_actors; ++i)
285     rcvr.wait();
286 }
287