1 /* Copyright (c) 2003-2006 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 
18 #include <ndb_global.h>
19 
20 #include "SHM_Transporter.hpp"
21 #include "TransporterInternalDefinitions.hpp"
22 #include <TransporterCallback.hpp>
23 #include <NdbSleep.h>
24 #include <NdbOut.hpp>
25 
26 #include <InputStream.hpp>
27 #include <OutputStream.hpp>
28 
29 extern int g_ndb_shm_signum;
30 
SHM_Transporter(TransporterRegistry & t_reg,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool checksum,bool signalId,key_t _shmKey,Uint32 _shmSize)31 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
32 				 const char *lHostName,
33 				 const char *rHostName,
34 				 int r_port,
35 				 bool isMgmConnection_arg,
36 				 NodeId lNodeId,
37 				 NodeId rNodeId,
38 				 NodeId serverNodeId,
39 				 bool checksum,
40 				 bool signalId,
41 				 key_t _shmKey,
42 				 Uint32 _shmSize) :
43   Transporter(t_reg, tt_SHM_TRANSPORTER,
44 	      lHostName, rHostName, r_port, isMgmConnection_arg,
45 	      lNodeId, rNodeId, serverNodeId,
46 	      0, false, checksum, signalId),
47   shmKey(_shmKey),
48   shmSize(_shmSize)
49 {
50 #ifndef NDB_WIN32
51   shmId= 0;
52 #endif
53   _shmSegCreated = false;
54   _attached = false;
55 
56   shmBuf = 0;
57   reader = 0;
58   writer = 0;
59 
60   setupBuffersDone=false;
61 #ifdef DEBUG_TRANSPORTER
62   printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
63 #endif
64   m_signal_threshold = 4096;
65 }
66 
~SHM_Transporter()67 SHM_Transporter::~SHM_Transporter(){
68   doDisconnect();
69 }
70 
71 bool
initTransporter()72 SHM_Transporter::initTransporter(){
73   if (g_ndb_shm_signum)
74     return true;
75   return false;
76 }
77 
78 void
setupBuffers()79 SHM_Transporter::setupBuffers(){
80   Uint32 sharedSize = 0;
81   sharedSize += 28; //SHM_Reader::getSharedSize();
82   sharedSize += 28; //SHM_Writer::getSharedSize();
83 
84   const Uint32 slack = MAX_MESSAGE_SIZE;
85 
86   /**
87    *  NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
88    */
89   Uint32 sizeOfBuffer = shmSize;
90   sizeOfBuffer -= 2*sharedSize;
91   sizeOfBuffer /= 2;
92 
93   Uint32 * base1 = (Uint32*)shmBuf;
94 
95   Uint32 * sharedReadIndex1 = base1;
96   Uint32 * sharedWriteIndex1 = base1 + 1;
97   serverStatusFlag = base1 + 4;
98   char * startOfBuf1 = shmBuf+sharedSize;
99 
100   Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
101   Uint32 * sharedReadIndex2 = base2;
102   Uint32 * sharedWriteIndex2 = base2 + 1;
103   clientStatusFlag = base2 + 4;
104   char * startOfBuf2 = ((char *)base2)+sharedSize;
105 
106   if(isServer){
107     * serverStatusFlag = 0;
108     reader = new SHM_Reader(startOfBuf1,
109 			    sizeOfBuffer,
110 			    slack,
111 			    sharedReadIndex1,
112 			    sharedWriteIndex1);
113 
114     writer = new SHM_Writer(startOfBuf2,
115 			    sizeOfBuffer,
116 			    slack,
117 			    sharedReadIndex2,
118 			    sharedWriteIndex2);
119 
120     * sharedReadIndex1 = 0;
121     * sharedWriteIndex1 = 0;
122 
123     * sharedReadIndex2 = 0;
124     * sharedWriteIndex2 = 0;
125 
126     reader->clear();
127     writer->clear();
128 
129     * serverStatusFlag = 1;
130 
131 #ifdef DEBUG_TRANSPORTER
132     printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
133     printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
134     printf("sharedReadIndex1 at %d (%p) = %d\n",
135 	   (char*)sharedReadIndex1-shmBuf,
136 	   sharedReadIndex1, *sharedReadIndex1);
137     printf("sharedWriteIndex1 at %d (%p) = %d\n",
138 	   (char*)sharedWriteIndex1-shmBuf,
139 	   sharedWriteIndex1, *sharedWriteIndex1);
140 
141     printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
142     printf("sharedReadIndex2 at %d (%p) = %d\n",
143 	   (char*)sharedReadIndex2-shmBuf,
144 	   sharedReadIndex2, *sharedReadIndex2);
145     printf("sharedWriteIndex2 at %d (%p) = %d\n",
146 	   (char*)sharedWriteIndex2-shmBuf,
147 	   sharedWriteIndex2, *sharedWriteIndex2);
148 
149     printf("sizeOfBuffer = %d\n", sizeOfBuffer);
150 #endif
151   } else {
152     * clientStatusFlag = 0;
153     reader = new SHM_Reader(startOfBuf2,
154 			    sizeOfBuffer,
155 			    slack,
156 			    sharedReadIndex2,
157 			    sharedWriteIndex2);
158 
159     writer = new SHM_Writer(startOfBuf1,
160 			    sizeOfBuffer,
161 			    slack,
162 			    sharedReadIndex1,
163 			    sharedWriteIndex1);
164 
165     * sharedReadIndex2 = 0;
166     * sharedWriteIndex1 = 0;
167 
168     reader->clear();
169     writer->clear();
170     * clientStatusFlag = 1;
171 #ifdef DEBUG_TRANSPORTER
172     printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
173     printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
174     printf("sharedReadIndex2 at %d (%p) = %d\n",
175 	   (char*)sharedReadIndex2-shmBuf,
176 	   sharedReadIndex2, *sharedReadIndex2);
177     printf("sharedWriteIndex2 at %d (%p) = %d\n",
178 	   (char*)sharedWriteIndex2-shmBuf,
179 	   sharedWriteIndex2, *sharedWriteIndex2);
180 
181     printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
182     printf("sharedReadIndex1 at %d (%p) = %d\n",
183 	   (char*)sharedReadIndex1-shmBuf,
184 	   sharedReadIndex1, *sharedReadIndex1);
185     printf("sharedWriteIndex1 at %d (%p) = %d\n",
186 	   (char*)sharedWriteIndex1-shmBuf,
187 	   sharedWriteIndex1, *sharedWriteIndex1);
188 
189     printf("sizeOfBuffer = %d\n", sizeOfBuffer);
190 #endif
191   }
192 #ifdef DEBUG_TRANSPORTER
193   printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
194 #endif
195 }
196 
197 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)198 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
199 {
200   DBUG_ENTER("SHM_Transporter::connect_server_impl");
201   SocketOutputStream s_output(sockfd);
202   SocketInputStream s_input(sockfd);
203   char buf[256];
204 
205   // Create
206   if(!_shmSegCreated){
207     if (!ndb_shm_create()) {
208       make_error_info(buf, sizeof(buf));
209       report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);
210       NDB_CLOSE_SOCKET(sockfd);
211       DBUG_RETURN(false);
212     }
213     _shmSegCreated = true;
214   }
215 
216   // Attach
217   if(!_attached){
218     if (!ndb_shm_attach()) {
219       make_error_info(buf, sizeof(buf));
220       report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
221       NDB_CLOSE_SOCKET(sockfd);
222       DBUG_RETURN(false);
223     }
224     _attached = true;
225   }
226 
227   // Send ok to client
228   s_output.println("shm server 1 ok: %d",
229 		   m_transporter_registry.m_shm_own_pid);
230 
231   // Wait for ok from client
232   DBUG_PRINT("info", ("Wait for ok from client"));
233   if (s_input.gets(buf, sizeof(buf)) == 0)
234   {
235     NDB_CLOSE_SOCKET(sockfd);
236     DBUG_RETURN(false);
237   }
238 
239   if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
240   {
241     NDB_CLOSE_SOCKET(sockfd);
242     DBUG_RETURN(false);
243   }
244 
245   int r= connect_common(sockfd);
246 
247   if (r) {
248     // Send ok to client
249     s_output.println("shm server 2 ok");
250     // Wait for ok from client
251     if (s_input.gets(buf, 256) == 0) {
252       NDB_CLOSE_SOCKET(sockfd);
253       DBUG_RETURN(false);
254     }
255     DBUG_PRINT("info", ("Successfully connected server to node %d",
256                 remoteNodeId));
257   }
258 
259   NDB_CLOSE_SOCKET(sockfd);
260   DBUG_RETURN(r);
261 }
262 
263 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)264 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
265 {
266   DBUG_ENTER("SHM_Transporter::connect_client_impl");
267   SocketInputStream s_input(sockfd);
268   SocketOutputStream s_output(sockfd);
269   char buf[256];
270 
271   // Wait for server to create and attach
272   DBUG_PRINT("info", ("Wait for server to create and attach"));
273   if (s_input.gets(buf, 256) == 0) {
274     NDB_CLOSE_SOCKET(sockfd);
275     DBUG_PRINT("error", ("Server id %d did not attach",
276                 remoteNodeId));
277     DBUG_RETURN(false);
278   }
279 
280   if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
281   {
282     NDB_CLOSE_SOCKET(sockfd);
283     DBUG_RETURN(false);
284   }
285 
286   // Create
287   if(!_shmSegCreated){
288     if (!ndb_shm_get()) {
289       NDB_CLOSE_SOCKET(sockfd);
290       DBUG_PRINT("error", ("Failed create of shm seg to node %d",
291                   remoteNodeId));
292       DBUG_RETURN(false);
293     }
294     _shmSegCreated = true;
295   }
296 
297   // Attach
298   if(!_attached){
299     if (!ndb_shm_attach()) {
300       make_error_info(buf, sizeof(buf));
301       report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
302       NDB_CLOSE_SOCKET(sockfd);
303       DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
304                   remoteNodeId));
305       DBUG_RETURN(false);
306     }
307     _attached = true;
308   }
309 
310   // Send ok to server
311   s_output.println("shm client 1 ok: %d",
312 		   m_transporter_registry.m_shm_own_pid);
313 
314   int r= connect_common(sockfd);
315 
316   if (r) {
317     // Wait for ok from server
318     DBUG_PRINT("info", ("Wait for ok from server"));
319     if (s_input.gets(buf, 256) == 0) {
320       NDB_CLOSE_SOCKET(sockfd);
321       DBUG_PRINT("error", ("No ok from server node %d",
322                   remoteNodeId));
323       DBUG_RETURN(false);
324     }
325     // Send ok to server
326     s_output.println("shm client 2 ok");
327     DBUG_PRINT("info", ("Successfully connected client to node %d",
328                 remoteNodeId));
329   }
330 
331   NDB_CLOSE_SOCKET(sockfd);
332   DBUG_RETURN(r);
333 }
334 
335 bool
connect_common(NDB_SOCKET_TYPE sockfd)336 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
337 {
338   if (!checkConnected()) {
339     return false;
340   }
341 
342   if(!setupBuffersDone)
343   {
344     setupBuffers();
345     setupBuffersDone=true;
346   }
347 
348   if(setupBuffersDone)
349   {
350     NdbSleep_MilliSleep(m_timeOutMillis);
351     if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
352     {
353       m_last_signal = 0;
354       return true;
355     }
356   }
357 
358   DBUG_PRINT("error", ("Failed to set up buffers to node %d",
359               remoteNodeId));
360   return false;
361 }
362 
363 void
doSend()364 SHM_Transporter::doSend()
365 {
366   if(m_last_signal)
367   {
368     m_last_signal = 0;
369     kill(m_remote_pid, g_ndb_shm_signum);
370   }
371 }
372 
373 Uint32
get_free_buffer() const374 SHM_Transporter::get_free_buffer() const
375 {
376   return writer->get_free_buffer();
377 }
378