1 /* Copyright (C) 2014 InfiniDB, Inc. 2 3 This program is free software; you can redistribute it and/or 4 modify it under the terms of the GNU General Public License 5 as published by the Free Software Foundation; version 2 of 6 the License. 7 8 This program is distributed in the hope that it will be useful, 9 but WITHOUT ANY WARRANTY; without even the implied warranty of 10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11 GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License 14 along with this program; if not, write to the Free Software 15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, 16 MA 02110-1301, USA. */ 17 18 /****************************************************************************** 19 * $Id: masterdbrmnode.h 1823 2013-01-21 14:13:09Z rdempsey $ 20 * 21 *****************************************************************************/ 22 23 /** @file 24 * class MasterDBRMNode interface 25 */ 26 27 #ifndef MASTERDBRMNODE_H_ 28 #define MASTERDBRMNODE_H_ 29 30 #include <boost/thread.hpp> 31 #include <boost/scoped_ptr.hpp> 32 33 #include <stdint.h> 34 #include "brmtypes.h" 35 #include "lbidresourcegraph.h" 36 #include "messagequeue.h" 37 #include "bytestream.h" 38 #include "configcpp.h" 39 #include "sessionmanagerserver.h" 40 #include "oidserver.h" 41 #include "tablelockserver.h" 42 #include "autoincrementmanager.h" 43 44 namespace BRM 45 { 46 47 /** @brief The Master node of the DBRM system. 48 * 49 * There are 3 components of the Distributed BRM (DBRM). 50 * \li The interface 51 * \li The Master node 52 * \li Slave nodes 53 * 54 * The DBRM components effectively implement a networking & synchronization 55 * layer to the BlockResolutionManager class so that every node that needs 56 * BRM data always has an up-to-date copy of it locally. An operation that changes 57 * BRM data is duplicated on all hosts that run a Slave node so that every 58 * node has identical copies. All "read" operations are satisfied locally. 59 * 60 * The MasterDBRMNode class implements the Master node. All changes to BRM 61 * data are serialized and distributed through the Master node. 62 * 63 * The Master node requires configuration file entries for itself and 64 * every slave it should connect to. 65 * 66 * \code 67 * <DBRM_Controller> 68 * <IPAddr> 69 * <Port> 70 * <NumWorkers>N</NumWorkers> 71 * </DBRM_Controller> 72 * <DBRM_Worker1> 73 * <IPAddr> 74 * <Port> 75 * </DBRM_Worker1> 76 * ... 77 * <DBRM_WorkerN> 78 * <IPAddr> 79 * <Port> 80 * </DBRM_WorkerN> 81 * \endcode 82 */ 83 84 constexpr size_t connectTimeoutStep = 50000; 85 86 class MasterDBRMNode 87 { 88 public: 89 MasterDBRMNode(); 90 ~MasterDBRMNode(); 91 92 /** @brief The primary function of the class. 93 * 94 * The main loop of the master node. It accepts connections from the DBRM 95 * class, receives commands, and distributes them to each slave. It returns 96 * only after stop() or the destructor is called by another thread. 97 */ 98 void run(); 99 100 /** @brief Tells the Master to shut down cleanly. 101 * 102 * Tells the Master to shut down cleanly. 103 */ 104 void stop(); 105 106 /** @brief Effectively makes the whole DBRM system stop. 107 * 108 * Grabs a lock that effectively halts all further BRM data changes. 109 * @warning Use with care. It's basically an accessor to a raw pthread_mutex. 110 */ 111 void lock(); 112 113 /** @brief Resumes DBRM functionality. 114 * 115 * Releases a lock that allows the DBRM to continue processing changes. 116 * @warning Use with care. It's basically an accessor to a raw pthread_mutex. 117 */ 118 void unlock(); 119 120 /** @brief Reload the config file and reconnect to all slaves. 121 * 122 * Drops all existing connections, reloads the config file and 123 * reconnects with all slaves. 124 * @note Doesn't work yet. Redundant anyway. 125 */ 126 void reload(); 127 128 /** @brief Sets either read/write or read-only mode 129 * 130 * Sets either read/write or read-only mode. When in read-only mode 131 * all BRM change requests will return ERR_READONLY immediately. 132 * @param ro true specifies read-only, false specifies read/write 133 */ 134 void setReadOnly(bool ro); 135 136 /** @brief Returns true if the Master is in read-only mode, false if in RW mode. 137 * 138 * @returns true if the Master is in read-only mode, false if in read-write 139 * mode 140 */ isReadOnly()141 bool isReadOnly() const 142 { 143 return readOnly; 144 } 145 /** @brief Connects to the all workers */ 146 void connectToWorkers(const size_t connectTimeoutSecs); 147 148 /** @brief Extracts number of workers and connection timeout from the config */ 149 void getNumWorkersAndTimeout(size_t& connectTimeoutSecs, 150 const std::string& methodName, 151 config::Config* config); 152 153 private: 154 155 class MsgProcessor 156 { 157 public: 158 MsgProcessor(MasterDBRMNode* master); 159 ~MsgProcessor(); 160 void operator()(); 161 private: 162 MasterDBRMNode* m; 163 }; 164 165 struct ThreadParams 166 { 167 messageqcpp::IOSocket* sock; 168 boost::thread* t; 169 }; 170 171 172 MasterDBRMNode(const MasterDBRMNode& m); 173 MasterDBRMNode& operator=(const MasterDBRMNode& m); 174 175 void initMsgQueues(config::Config* config); 176 void msgProcessor(); 177 void distribute(messageqcpp::ByteStream* msg); 178 void undo() throw(); 179 void confirm(); 180 void sendError(messageqcpp::IOSocket* dest, uint8_t err) const throw(); 181 int gatherResponses(uint8_t cmd, uint32_t msgCmdLength, 182 std::vector<messageqcpp::ByteStream*>* responses, 183 bool& readErrFlag) throw(); 184 int compareResponses(uint8_t cmd, uint32_t msgCmdLength, 185 const std::vector <messageqcpp::ByteStream*>& responses) const; 186 void finalCleanup(); 187 188 /* Commands the master executes */ 189 void doHalt(messageqcpp::IOSocket* sock); 190 void doResume(messageqcpp::IOSocket* sock); 191 void doReload(messageqcpp::IOSocket* sock); 192 void doSetReadOnly(messageqcpp::IOSocket* sock, bool b); 193 void doGetReadOnly(messageqcpp::IOSocket* sock); 194 195 /* SessionManager interface */ 196 SessionManagerServer sm; 197 void doVerID(messageqcpp::ByteStream& msg, ThreadParams* p); 198 void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p); 199 void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p); 200 void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p); 201 void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p); 202 void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p); 203 void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p); 204 void doSIDTIDMap(messageqcpp::ByteStream& msg, ThreadParams* p); 205 void doGetShmContents(messageqcpp::ByteStream& msg, ThreadParams* p); 206 void doGetUniqueUint32(messageqcpp::ByteStream& msg, ThreadParams* p); 207 void doGetUniqueUint64(messageqcpp::ByteStream& msg, ThreadParams* p); 208 void doGetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p); 209 void doSetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p); 210 void doClearSystemState(messageqcpp::ByteStream& msg, ThreadParams* p); 211 void doSessionManagerReset(messageqcpp::ByteStream& msg, ThreadParams* p); 212 void doGetUncommittedLbids(messageqcpp::ByteStream& msg, ThreadParams* p); 213 214 215 /* OID Manager interface */ 216 OIDServer oids; 217 boost::mutex oidsMutex; 218 void doAllocOIDs(messageqcpp::ByteStream& msg, ThreadParams* p); 219 void doReturnOIDs(messageqcpp::ByteStream& msg, ThreadParams* p); 220 void doOidmSize(messageqcpp::ByteStream& msg, ThreadParams* p); 221 void doAllocVBOID(messageqcpp::ByteStream& msg, ThreadParams* p); 222 void doGetDBRootOfVBOID(messageqcpp::ByteStream& msg, ThreadParams* p); 223 void doGetVBOIDToDBRootMap(messageqcpp::ByteStream& msg, ThreadParams* p); 224 225 /* Table lock interface */ 226 boost::scoped_ptr<TableLockServer> tableLockServer; 227 void doGetTableLock(messageqcpp::ByteStream& msg, ThreadParams* p); 228 void doReleaseTableLock(messageqcpp::ByteStream& msg, ThreadParams* p); 229 void doChangeTableLockState(messageqcpp::ByteStream& msg, ThreadParams* p); 230 void doChangeTableLockOwner(messageqcpp::ByteStream& msg, ThreadParams* p); 231 void doGetAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p); 232 void doReleaseAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p); 233 void doGetTableLockInfo(messageqcpp::ByteStream& msg, ThreadParams* p); 234 void doOwnerCheck(messageqcpp::ByteStream& msg, ThreadParams* p); 235 236 /* Autoincrement interface */ 237 AutoincrementManager aiManager; 238 void doStartAISequence(messageqcpp::ByteStream& msg, ThreadParams* p); 239 void doGetAIRange(messageqcpp::ByteStream& msg, ThreadParams* p); 240 void doResetAISequence(messageqcpp::ByteStream& msg, ThreadParams* p); 241 void doGetAILock(messageqcpp::ByteStream& msg, ThreadParams* p); 242 void doReleaseAILock(messageqcpp::ByteStream& msg, ThreadParams* p); 243 void doDeleteAISequence(messageqcpp::ByteStream& msg, ThreadParams* p); 244 245 messageqcpp::MessageQueueServer* dbrmServer; 246 std::vector<messageqcpp::MessageQueueClient*> slaves; 247 std::vector<messageqcpp::MessageQueueClient*>::iterator iSlave; 248 std::vector<messageqcpp::IOSocket*> activeSessions; 249 250 LBIDResourceGraph* rg; 251 252 boost::mutex mutex; 253 boost::mutex mutex2; // protects params and the hand-off TODO: simplify 254 boost::mutex slaveLock; // syncs communication with the slaves 255 boost::mutex serverLock; // kludge to synchronize reloading 256 int runners, NumWorkers; 257 ThreadParams* params; 258 volatile bool die, halting; 259 bool reloadCmd; 260 mutable bool readOnly; 261 struct timespec MSG_TIMEOUT; 262 }; 263 264 } 265 266 #endif 267