1 #pragma once 2 #include <vector> 3 #include <unordered_set> 4 #include <string> 5 #include <queue> 6 #include <thread> 7 #include <atomic> 8 #include <memory> 9 #include <mutex> 10 #include <condition_variable> 11 #include <boost/icl/interval_map.hpp> 12 #include "ChannelMap.h" 13 #include "core/global.h" 14 #include "util/Datagram.h" 15 #include "util/DatagramIterator.h" 16 #include "util/TaskQueue.h" 17 #include "net/NetworkAcceptor.h" 18 19 class MDParticipantInterface; 20 class MDUpstream; 21 22 // A MessageDirector is the internal networking object for an Astron server-node. 23 // The MessageDirector receives message from other servers and routes them to the 24 // Client Agent, State Server, DB Server, DB-SS, and other server-nodes as necessary. 25 class MessageDirector final : public ChannelMap 26 { 27 public: 28 ~MessageDirector(); 29 30 // init_network causes the MessageDirector to start listening for 31 // messages if it hasn't already been initialized. 32 void init_network(); 33 static MessageDirector singleton; 34 35 // route_datagram accepts any Astron message (a Datagram), and 36 // properly routes it to any subscribed listeners. 37 // Message on the CONTROL_MESSAGE channel are processed internally by the MessageDirector. 38 void route_datagram(MDParticipantInterface *p, DatagramHandle dg); 39 40 // logger returns the MessageDirector log category. logger()41 inline LogCategory& logger() 42 { 43 return m_log; 44 } 45 46 // For MDUpstream (and subclasses) to call. 47 void receive_datagram(DatagramHandle dg); 48 void receive_disconnect(const uvw::ErrorEvent &evt); 49 50 protected: 51 void on_add_channel(channel_t c); 52 void on_remove_channel(channel_t c); 53 void on_add_range(channel_t lo, channel_t hi); 54 void on_remove_range(channel_t lo, channel_t hi); 55 56 private: 57 MessageDirector(); 58 59 bool m_initialized; 60 61 std::unique_ptr<NetworkAcceptor> m_net_acceptor; 62 MDUpstream *m_upstream; 63 64 // Connected participants 65 std::unordered_set<MDParticipantInterface*> m_participants; 66 std::unordered_set<MDParticipantInterface*> m_terminated_participants; 67 68 // Threading stuff: 69 bool m_shutdown; 70 bool m_main_is_routing; 71 std::unique_ptr<std::thread> m_thread; 72 std::mutex m_participants_lock; 73 std::mutex m_terminated_lock; 74 std::mutex m_messages_lock; 75 std::queue<std::pair<MDParticipantInterface *, DatagramHandle>> m_messages; 76 std::condition_variable m_cv; 77 78 void flush_queue(); 79 void process_datagram(MDParticipantInterface *p, DatagramHandle dg); 80 void process_terminates(); 81 void routing_thread(); 82 void shutdown_threading(); 83 84 LogCategory m_log; 85 86 friend class MDParticipantInterface; 87 void add_participant(MDParticipantInterface* participant); 88 void remove_participant(MDParticipantInterface* participant); 89 void preroute_post_remove(channel_t sender, DatagramHandle dg); 90 void recall_post_removes(channel_t sender); 91 92 // I/O OPERATIONS 93 void handle_connection(const std::shared_ptr<uvw::TcpHandle> &socket); 94 void handle_error(const uvw::ErrorEvent& evt); 95 }; 96 97 98 99 // A MDParticipantInterface is the interface that must be implemented to 100 // receive messages from the MessageDirector. 101 // MDParticipants might be a StateServer, a single StateServer object, the 102 // DB-server, or etc. which are on the node and will be transferred 103 // internally. Another server with its own MessageDirector also would 104 // be an MDParticipant. 105 class MDParticipantInterface : public ChannelSubscriber 106 { 107 friend class MessageDirector; 108 109 public: MDParticipantInterface()110 MDParticipantInterface() 111 { 112 MessageDirector::singleton.add_participant(this); 113 } 114 115 // handle_datagram should handle a message received from the MessageDirector. 116 // Implementations of handle_datagram should be non-blocking operations. 117 virtual void handle_datagram(DatagramHandle dg, DatagramIterator &dgi) = 0; 118 119 // post_remove tells the MDParticipant to handle all of its post remove packets. post_remove()120 inline void post_remove() 121 { 122 logger().debug() << "MDParticipant '" << m_name << "' sending post removes..." << std::endl; 123 for(auto sender_it = m_post_removes.begin(); sender_it != m_post_removes.end(); ++sender_it) { 124 // Route datagrams for the sender 125 std::vector<DatagramHandle>& datagrams = sender_it->second; 126 for(auto dg = datagrams.begin(); dg != datagrams.end(); ++dg) { 127 route_datagram(*dg); 128 } 129 130 // Clear the post removes from pre-routed tables 131 MessageDirector::singleton.recall_post_removes(sender_it->first); 132 } 133 } 134 135 // terminate cleans up the participant's subscriptions and signals 136 // the message director that the object is ready for deletion. terminate()137 inline void terminate() 138 { 139 if(!m_is_terminated.exchange(true)) { 140 MessageDirector::singleton.remove_participant(this); 141 } 142 } is_terminated()143 inline bool is_terminated() 144 { 145 return m_is_terminated; 146 } 147 148 protected: route_datagram(DatagramHandle dg)149 inline void route_datagram(DatagramHandle dg) 150 { 151 MessageDirector::singleton.route_datagram(this, dg); 152 } subscribe_channel(channel_t c)153 inline void subscribe_channel(channel_t c) 154 { 155 logger().trace() << "MDParticipant '" << m_name << "' subscribed channel: " << c << std::endl; 156 MessageDirector::singleton.subscribe_channel(this, c); 157 } unsubscribe_channel(channel_t c)158 inline void unsubscribe_channel(channel_t c) 159 { 160 logger().trace() << "MDParticipant '" << m_name << "' unsubscribed channel: " << c << std::endl; 161 MessageDirector::singleton.unsubscribe_channel(this, c); 162 } subscribe_range(channel_t lo,channel_t hi)163 inline void subscribe_range(channel_t lo, channel_t hi) 164 { 165 logger().trace() << "MDParticipant '" << m_name << "' subscribed range, " 166 << "lo: " << lo << ", hi: " << hi << std::endl; 167 MessageDirector::singleton.subscribe_range(this, lo, hi); 168 } unsubscribe_range(channel_t lo,channel_t hi)169 inline void unsubscribe_range(channel_t lo, channel_t hi) 170 { 171 logger().trace() << "MDParticipant '" << m_name << "' unsubscribed range, " 172 << "lo: " << lo << ", hi: " << hi << std::endl; 173 MessageDirector::singleton.unsubscribe_range(this, lo, hi); 174 } unsubscribe_all()175 inline void unsubscribe_all() 176 { 177 logger().trace() << "MDParticipant '" << m_name << "' unsubscribing from all.\n"; 178 MessageDirector::singleton.unsubscribe_all(this); 179 } add_post_remove(channel_t sender,DatagramHandle dg)180 inline void add_post_remove(channel_t sender, DatagramHandle dg) 181 { 182 logger().trace() << "MDParticipant '" << m_name << "' added post remove." << std::endl; 183 m_post_removes[sender].push_back(dg); 184 MessageDirector::singleton.preroute_post_remove(sender, dg); 185 } clear_post_removes(channel_t sender)186 inline void clear_post_removes(channel_t sender) 187 { 188 logger().trace() << "MDParticipant '" << m_name << "' cleared post removes." << std::endl; 189 m_post_removes.erase(sender); 190 MessageDirector::singleton.recall_post_removes(sender); 191 } set_con_name(const std::string & name)192 inline void set_con_name(const std::string &name) 193 { 194 m_name = name; 195 } set_con_url(const std::string & url)196 inline void set_con_url(const std::string &url) 197 { 198 m_url = url; 199 } log_message(std::vector<uint8_t> message)200 inline void log_message(std::vector<uint8_t> message) 201 { 202 g_eventsender.send(Datagram::create(message)); 203 } logger()204 inline LogCategory logger() 205 { 206 return MessageDirector::singleton.logger(); 207 } 208 209 private: 210 // The messages to be distributed on unexpected disconnect. 211 std::unordered_map<channel_t, std::vector<DatagramHandle> > m_post_removes; 212 std::atomic<bool> m_is_terminated {false}; 213 std::string m_name; 214 std::string m_url; 215 }; 216 217 // This class abstractly represents an "upstream" link on the Message Director. 218 // All messages routed on the Message Director will be sent to the upstream link, 219 // except for messages that originated on the link to begin with. 220 class MDUpstream 221 { 222 public: 223 virtual void subscribe_channel(channel_t c) = 0; 224 virtual void unsubscribe_channel(channel_t c) = 0; 225 virtual void subscribe_range(channel_t lo, channel_t hi) = 0; 226 virtual void unsubscribe_range(channel_t lo, channel_t hi) = 0; 227 virtual void handle_datagram(DatagramHandle dg) = 0; 228 }; 229