1 #pragma once
2 #include <vector>
3 #include <unordered_set>
4 #include <string>
5 #include <queue>
6 #include <thread>
7 #include <atomic>
8 #include <memory>
9 #include <mutex>
10 #include <condition_variable>
11 #include <boost/icl/interval_map.hpp>
12 #include "ChannelMap.h"
13 #include "core/global.h"
14 #include "util/Datagram.h"
15 #include "util/DatagramIterator.h"
16 #include "util/TaskQueue.h"
17 #include "net/NetworkAcceptor.h"
18 
19 class MDParticipantInterface;
20 class MDUpstream;
21 
22 // A MessageDirector is the internal networking object for an Astron server-node.
23 // The MessageDirector receives message from other servers and routes them to the
24 //     Client Agent, State Server, DB Server, DB-SS, and other server-nodes as necessary.
25 class MessageDirector final : public ChannelMap
26 {
27   public:
28     ~MessageDirector();
29 
30     // init_network causes the MessageDirector to start listening for
31     //     messages if it hasn't already been initialized.
32     void init_network();
33     static MessageDirector singleton;
34 
35     // route_datagram accepts any Astron message (a Datagram), and
36     //     properly routes it to any subscribed listeners.
37     // Message on the CONTROL_MESSAGE channel are processed internally by the MessageDirector.
38     void route_datagram(MDParticipantInterface *p, DatagramHandle dg);
39 
40     // logger returns the MessageDirector log category.
logger()41     inline LogCategory& logger()
42     {
43         return m_log;
44     }
45 
46     // For MDUpstream (and subclasses) to call.
47     void receive_datagram(DatagramHandle dg);
48     void receive_disconnect(const uvw::ErrorEvent &evt);
49 
50   protected:
51     void on_add_channel(channel_t c);
52     void on_remove_channel(channel_t c);
53     void on_add_range(channel_t lo, channel_t hi);
54     void on_remove_range(channel_t lo, channel_t hi);
55 
56   private:
57     MessageDirector();
58 
59     bool m_initialized;
60 
61     std::unique_ptr<NetworkAcceptor> m_net_acceptor;
62     MDUpstream *m_upstream;
63 
64     // Connected participants
65     std::unordered_set<MDParticipantInterface*> m_participants;
66     std::unordered_set<MDParticipantInterface*> m_terminated_participants;
67 
68     // Threading stuff:
69     bool m_shutdown;
70     bool m_main_is_routing;
71     std::unique_ptr<std::thread> m_thread;
72     std::mutex m_participants_lock;
73     std::mutex m_terminated_lock;
74     std::mutex m_messages_lock;
75     std::queue<std::pair<MDParticipantInterface *, DatagramHandle>> m_messages;
76     std::condition_variable m_cv;
77 
78     void flush_queue();
79     void process_datagram(MDParticipantInterface *p, DatagramHandle dg);
80     void process_terminates();
81     void routing_thread();
82     void shutdown_threading();
83 
84     LogCategory m_log;
85 
86     friend class MDParticipantInterface;
87     void add_participant(MDParticipantInterface* participant);
88     void remove_participant(MDParticipantInterface* participant);
89     void preroute_post_remove(channel_t sender, DatagramHandle dg);
90     void recall_post_removes(channel_t sender);
91 
92     // I/O OPERATIONS
93     void handle_connection(const std::shared_ptr<uvw::TcpHandle> &socket);
94     void handle_error(const uvw::ErrorEvent& evt);
95 };
96 
97 
98 
99 // A MDParticipantInterface is the interface that must be implemented to
100 //     receive messages from the MessageDirector.
101 // MDParticipants might be a StateServer, a single StateServer object, the
102 //     DB-server, or etc. which are on the node and will be transferred
103 //     internally.  Another server with its own MessageDirector also would
104 //     be an MDParticipant.
105 class MDParticipantInterface : public ChannelSubscriber
106 {
107     friend class MessageDirector;
108 
109   public:
MDParticipantInterface()110     MDParticipantInterface()
111     {
112         MessageDirector::singleton.add_participant(this);
113     }
114 
115     // handle_datagram should handle a message received from the MessageDirector.
116     // Implementations of handle_datagram should be non-blocking operations.
117     virtual void handle_datagram(DatagramHandle dg, DatagramIterator &dgi) = 0;
118 
119     // post_remove tells the MDParticipant to handle all of its post remove packets.
post_remove()120     inline void post_remove()
121     {
122         logger().debug() << "MDParticipant '" << m_name << "' sending post removes..." << std::endl;
123         for(auto sender_it = m_post_removes.begin(); sender_it != m_post_removes.end(); ++sender_it) {
124             // Route datagrams for the sender
125             std::vector<DatagramHandle>& datagrams = sender_it->second;
126             for(auto dg = datagrams.begin(); dg != datagrams.end(); ++dg) {
127                 route_datagram(*dg);
128             }
129 
130             // Clear the post removes from pre-routed tables
131             MessageDirector::singleton.recall_post_removes(sender_it->first);
132         }
133     }
134 
135     // terminate cleans up the participant's subscriptions and signals
136     //     the message director that the object is ready for deletion.
terminate()137     inline void terminate()
138     {
139         if(!m_is_terminated.exchange(true)) {
140             MessageDirector::singleton.remove_participant(this);
141         }
142     }
is_terminated()143     inline bool is_terminated()
144     {
145         return m_is_terminated;
146     }
147 
148   protected:
route_datagram(DatagramHandle dg)149     inline void route_datagram(DatagramHandle dg)
150     {
151         MessageDirector::singleton.route_datagram(this, dg);
152     }
subscribe_channel(channel_t c)153     inline void subscribe_channel(channel_t c)
154     {
155         logger().trace() << "MDParticipant '" << m_name << "' subscribed channel: " << c << std::endl;
156         MessageDirector::singleton.subscribe_channel(this, c);
157     }
unsubscribe_channel(channel_t c)158     inline void unsubscribe_channel(channel_t c)
159     {
160         logger().trace() << "MDParticipant '" << m_name << "' unsubscribed channel: " << c << std::endl;
161         MessageDirector::singleton.unsubscribe_channel(this, c);
162     }
subscribe_range(channel_t lo,channel_t hi)163     inline void subscribe_range(channel_t lo, channel_t hi)
164     {
165         logger().trace() << "MDParticipant '" << m_name << "' subscribed range, "
166                          << "lo: " << lo << ", hi: " << hi << std::endl;
167         MessageDirector::singleton.subscribe_range(this, lo, hi);
168     }
unsubscribe_range(channel_t lo,channel_t hi)169     inline void unsubscribe_range(channel_t lo, channel_t hi)
170     {
171         logger().trace() << "MDParticipant '" << m_name << "' unsubscribed range, "
172                          << "lo: " << lo << ", hi: " << hi << std::endl;
173         MessageDirector::singleton.unsubscribe_range(this, lo, hi);
174     }
unsubscribe_all()175     inline void unsubscribe_all()
176     {
177         logger().trace() << "MDParticipant '" << m_name << "' unsubscribing from all.\n";
178         MessageDirector::singleton.unsubscribe_all(this);
179     }
add_post_remove(channel_t sender,DatagramHandle dg)180     inline void add_post_remove(channel_t sender, DatagramHandle dg)
181     {
182         logger().trace() << "MDParticipant '" << m_name << "' added post remove." << std::endl;
183         m_post_removes[sender].push_back(dg);
184         MessageDirector::singleton.preroute_post_remove(sender, dg);
185     }
clear_post_removes(channel_t sender)186     inline void clear_post_removes(channel_t sender)
187     {
188         logger().trace() << "MDParticipant '" << m_name << "' cleared post removes." << std::endl;
189         m_post_removes.erase(sender);
190         MessageDirector::singleton.recall_post_removes(sender);
191     }
set_con_name(const std::string & name)192     inline void set_con_name(const std::string &name)
193     {
194         m_name = name;
195     }
set_con_url(const std::string & url)196     inline void set_con_url(const std::string &url)
197     {
198         m_url = url;
199     }
log_message(std::vector<uint8_t> message)200     inline void log_message(std::vector<uint8_t> message)
201     {
202         g_eventsender.send(Datagram::create(message));
203     }
logger()204     inline LogCategory logger()
205     {
206         return MessageDirector::singleton.logger();
207     }
208 
209   private:
210     // The messages to be distributed on unexpected disconnect.
211     std::unordered_map<channel_t, std::vector<DatagramHandle> > m_post_removes;
212     std::atomic<bool> m_is_terminated {false};
213     std::string m_name;
214     std::string m_url;
215 };
216 
217 // This class abstractly represents an "upstream" link on the Message Director.
218 // All messages routed on the Message Director will be sent to the upstream link,
219 // except for messages that originated on the link to begin with.
220 class MDUpstream
221 {
222   public:
223     virtual void subscribe_channel(channel_t c) = 0;
224     virtual void unsubscribe_channel(channel_t c) = 0;
225     virtual void subscribe_range(channel_t lo, channel_t hi) = 0;
226     virtual void unsubscribe_range(channel_t lo, channel_t hi) = 0;
227     virtual void handle_datagram(DatagramHandle dg) = 0;
228 };
229