1 /* Copyright (C) 2014 InfiniDB, Inc.
2 
3    This program is free software; you can redistribute it and/or
4    modify it under the terms of the GNU General Public License
5    as published by the Free Software Foundation; version 2 of
6    the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
16    MA 02110-1301, USA. */
17 
18 /******************************************************************************
19  * $Id: masterdbrmnode.h 1823 2013-01-21 14:13:09Z rdempsey $
20  *
21  *****************************************************************************/
22 
23 /** @file
24  * class MasterDBRMNode interface
25  */
26 
27 #ifndef MASTERDBRMNODE_H_
28 #define MASTERDBRMNODE_H_
29 
30 #include <boost/thread.hpp>
31 #include <boost/scoped_ptr.hpp>
32 
33 #include <stdint.h>
34 #include "brmtypes.h"
35 #include "lbidresourcegraph.h"
36 #include "messagequeue.h"
37 #include "bytestream.h"
38 #include "configcpp.h"
39 #include "sessionmanagerserver.h"
40 #include "oidserver.h"
41 #include "tablelockserver.h"
42 #include "autoincrementmanager.h"
43 
44 namespace BRM
45 {
46 
47 /** @brief The Master node of the DBRM system.
48  *
49  * There are 3 components of the Distributed BRM (DBRM).
50  * \li The interface
51  * \li The Master node
52  * \li Slave nodes
53  *
54  * The DBRM components effectively implement a networking & synchronization
55  * layer to the BlockResolutionManager class so that every node that needs
56  * BRM data always has an up-to-date copy of it locally.  An operation that changes
57  * BRM data is duplicated on all hosts that run a Slave node so that every
58  * node has identical copies.  All "read" operations are satisfied locally.
59  *
60  * The MasterDBRMNode class implements the Master node.  All changes to BRM
61  * data are serialized and distributed through the Master node.
62  *
63  * The Master node requires configuration file entries for itself and
64  * every slave it should connect to.
65  *
66  * \code
67  * <DBRM_Controller>
68  * 	<IPAddr>
69  *	<Port>
70  * 	<NumWorkers>N</NumWorkers>
71  * </DBRM_Controller>
72  * <DBRM_Worker1>
73  *	<IPAddr>
74  *	<Port>
75  * </DBRM_Worker1>
76  *	...
77  * <DBRM_WorkerN>
78  *	<IPAddr>
79  *	<Port>
80  * </DBRM_WorkerN>
81  * \endcode
82  */
83 
84 constexpr size_t connectTimeoutStep = 50000;
85 
86 class MasterDBRMNode
87 {
88 public:
89     MasterDBRMNode();
90     ~MasterDBRMNode();
91 
92     /** @brief The primary function of the class.
93      *
94      * The main loop of the master node.  It accepts connections from the DBRM
95      * class, receives commands, and distributes them to each slave.  It returns
96      * only after stop() or the destructor is called by another thread.
97      */
98     void run();
99 
100     /** @brief Tells the Master to shut down cleanly.
101      *
102      * Tells the Master to shut down cleanly.
103      */
104     void stop();
105 
106     /** @brief Effectively makes the whole DBRM system stop.
107      *
108      * Grabs a lock that effectively halts all further BRM data changes.
109      * @warning Use with care.  It's basically an accessor to a raw pthread_mutex.
110      */
111     void lock();
112 
113     /** @brief Resumes DBRM functionality.
114      *
115      * Releases a lock that allows the DBRM to continue processing changes.
116      * @warning Use with care.  It's basically an accessor to a raw pthread_mutex.
117      */
118     void unlock();
119 
120     /** @brief Reload the config file and reconnect to all slaves.
121      *
122      * Drops all existing connections, reloads the config file and
123      * reconnects with all slaves.
124      * @note Doesn't work yet.  Redundant anyway.
125      */
126     void reload();
127 
128     /** @brief Sets either read/write or read-only mode
129      *
130      * Sets either read/write or read-only mode.  When in read-only mode
131      * all BRM change requests will return ERR_READONLY immediately.
132      * @param ro true specifies read-only, false specifies read/write
133      */
134     void setReadOnly(bool ro);
135 
136     /** @brief Returns true if the Master is in read-only mode, false if in RW mode.
137      *
138      * @returns true if the Master is in read-only mode, false if in read-write
139      * mode
140      */
isReadOnly()141     bool isReadOnly() const
142     {
143         return readOnly;
144     }
145     /** @brief Connects to the all workers */
146     void connectToWorkers(const size_t connectTimeoutSecs);
147 
148     /** @brief Extracts number of workers and connection timeout from the config */
149     void getNumWorkersAndTimeout(size_t& connectTimeoutSecs,
150                                  const std::string& methodName,
151                                  config::Config* config);
152 
153 private:
154 
155     class MsgProcessor
156     {
157     public:
158         MsgProcessor(MasterDBRMNode* master);
159         ~MsgProcessor();
160         void operator()();
161     private:
162         MasterDBRMNode* m;
163     };
164 
165     struct ThreadParams
166     {
167         messageqcpp::IOSocket* sock;
168         boost::thread* t;
169     };
170 
171 
172     MasterDBRMNode(const MasterDBRMNode& m);
173     MasterDBRMNode& operator=(const MasterDBRMNode& m);
174 
175     void initMsgQueues(config::Config* config);
176     void msgProcessor();
177     void distribute(messageqcpp::ByteStream* msg);
178     void undo() throw();
179     void confirm();
180     void sendError(messageqcpp::IOSocket* dest, uint8_t err) const throw();
181     int gatherResponses(uint8_t cmd, uint32_t msgCmdLength,
182                         std::vector<messageqcpp::ByteStream*>* responses,
183                         bool& readErrFlag) throw();
184     int compareResponses(uint8_t cmd, uint32_t msgCmdLength,
185                          const std::vector <messageqcpp::ByteStream*>& responses) const;
186     void finalCleanup();
187 
188     /* Commands the master executes */
189     void doHalt(messageqcpp::IOSocket* sock);
190     void doResume(messageqcpp::IOSocket* sock);
191     void doReload(messageqcpp::IOSocket* sock);
192     void doSetReadOnly(messageqcpp::IOSocket* sock, bool b);
193     void doGetReadOnly(messageqcpp::IOSocket* sock);
194 
195     /* SessionManager interface */
196     SessionManagerServer sm;
197     void doVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
198     void doGetSystemCatalog(messageqcpp::ByteStream& msg, ThreadParams* p);
199     void doSysCatVerID(messageqcpp::ByteStream& msg, ThreadParams* p);
200     void doNewTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
201     void doCommitted(messageqcpp::ByteStream& msg, ThreadParams* p);
202     void doRolledBack(messageqcpp::ByteStream& msg, ThreadParams* p);
203     void doGetTxnID(messageqcpp::ByteStream& msg, ThreadParams* p);
204     void doSIDTIDMap(messageqcpp::ByteStream& msg, ThreadParams* p);
205     void doGetShmContents(messageqcpp::ByteStream& msg, ThreadParams* p);
206     void doGetUniqueUint32(messageqcpp::ByteStream& msg, ThreadParams* p);
207     void doGetUniqueUint64(messageqcpp::ByteStream& msg, ThreadParams* p);
208     void doGetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
209     void doSetSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
210     void doClearSystemState(messageqcpp::ByteStream& msg, ThreadParams* p);
211     void doSessionManagerReset(messageqcpp::ByteStream& msg, ThreadParams* p);
212     void doGetUncommittedLbids(messageqcpp::ByteStream& msg, ThreadParams* p);
213 
214 
215     /* OID Manager interface */
216     OIDServer oids;
217     boost::mutex oidsMutex;
218     void doAllocOIDs(messageqcpp::ByteStream& msg, ThreadParams* p);
219     void doReturnOIDs(messageqcpp::ByteStream& msg, ThreadParams* p);
220     void doOidmSize(messageqcpp::ByteStream& msg, ThreadParams* p);
221     void doAllocVBOID(messageqcpp::ByteStream& msg, ThreadParams* p);
222     void doGetDBRootOfVBOID(messageqcpp::ByteStream& msg, ThreadParams* p);
223     void doGetVBOIDToDBRootMap(messageqcpp::ByteStream& msg, ThreadParams* p);
224 
225     /* Table lock interface */
226     boost::scoped_ptr<TableLockServer> tableLockServer;
227     void doGetTableLock(messageqcpp::ByteStream& msg, ThreadParams* p);
228     void doReleaseTableLock(messageqcpp::ByteStream& msg, ThreadParams* p);
229     void doChangeTableLockState(messageqcpp::ByteStream& msg, ThreadParams* p);
230     void doChangeTableLockOwner(messageqcpp::ByteStream& msg, ThreadParams* p);
231     void doGetAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p);
232     void doReleaseAllTableLocks(messageqcpp::ByteStream& msg, ThreadParams* p);
233     void doGetTableLockInfo(messageqcpp::ByteStream& msg, ThreadParams* p);
234     void doOwnerCheck(messageqcpp::ByteStream& msg, ThreadParams* p);
235 
236     /* Autoincrement interface */
237     AutoincrementManager aiManager;
238     void doStartAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
239     void doGetAIRange(messageqcpp::ByteStream& msg, ThreadParams* p);
240     void doResetAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
241     void doGetAILock(messageqcpp::ByteStream& msg, ThreadParams* p);
242     void doReleaseAILock(messageqcpp::ByteStream& msg, ThreadParams* p);
243     void doDeleteAISequence(messageqcpp::ByteStream& msg, ThreadParams* p);
244 
245     messageqcpp::MessageQueueServer* dbrmServer;
246     std::vector<messageqcpp::MessageQueueClient*> slaves;
247     std::vector<messageqcpp::MessageQueueClient*>::iterator iSlave;
248     std::vector<messageqcpp::IOSocket*> activeSessions;
249 
250     LBIDResourceGraph* rg;
251 
252     boost::mutex mutex;
253     boost::mutex mutex2;		// protects params and the hand-off  TODO: simplify
254     boost::mutex slaveLock;	// syncs communication with the slaves
255     boost::mutex serverLock;	// kludge to synchronize reloading
256     int runners, NumWorkers;
257     ThreadParams* params;
258     volatile bool die, halting;
259     bool reloadCmd;
260     mutable bool readOnly;
261     struct timespec MSG_TIMEOUT;
262 };
263 
264 }
265 
266 #endif
267