1 /* Copyright (c) 2003-2006 MySQL AB
2 Use is subject to license terms
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; version 2 of the License.
7
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12
13 You should have received a copy of the GNU General Public License
14 along with this program; if not, write to the Free Software
15 Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */
16
17
18 #include <ndb_global.h>
19
20 #include "SHM_Transporter.hpp"
21 #include "TransporterInternalDefinitions.hpp"
22 #include <TransporterCallback.hpp>
23 #include <NdbSleep.h>
24 #include <NdbOut.hpp>
25
26 #include <InputStream.hpp>
27 #include <OutputStream.hpp>
28
29 extern int g_ndb_shm_signum;
30
SHM_Transporter(TransporterRegistry & t_reg,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool checksum,bool signalId,key_t _shmKey,Uint32 _shmSize)31 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
32 const char *lHostName,
33 const char *rHostName,
34 int r_port,
35 bool isMgmConnection_arg,
36 NodeId lNodeId,
37 NodeId rNodeId,
38 NodeId serverNodeId,
39 bool checksum,
40 bool signalId,
41 key_t _shmKey,
42 Uint32 _shmSize) :
43 Transporter(t_reg, tt_SHM_TRANSPORTER,
44 lHostName, rHostName, r_port, isMgmConnection_arg,
45 lNodeId, rNodeId, serverNodeId,
46 0, false, checksum, signalId),
47 shmKey(_shmKey),
48 shmSize(_shmSize)
49 {
50 #ifndef NDB_WIN32
51 shmId= 0;
52 #endif
53 _shmSegCreated = false;
54 _attached = false;
55
56 shmBuf = 0;
57 reader = 0;
58 writer = 0;
59
60 setupBuffersDone=false;
61 #ifdef DEBUG_TRANSPORTER
62 printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
63 #endif
64 m_signal_threshold = 4096;
65 }
66
~SHM_Transporter()67 SHM_Transporter::~SHM_Transporter(){
68 doDisconnect();
69 }
70
71 bool
initTransporter()72 SHM_Transporter::initTransporter(){
73 if (g_ndb_shm_signum)
74 return true;
75 return false;
76 }
77
78 void
setupBuffers()79 SHM_Transporter::setupBuffers(){
80 Uint32 sharedSize = 0;
81 sharedSize += 28; //SHM_Reader::getSharedSize();
82 sharedSize += 28; //SHM_Writer::getSharedSize();
83
84 const Uint32 slack = MAX_MESSAGE_SIZE;
85
86 /**
87 * NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
88 */
89 Uint32 sizeOfBuffer = shmSize;
90 sizeOfBuffer -= 2*sharedSize;
91 sizeOfBuffer /= 2;
92
93 Uint32 * base1 = (Uint32*)shmBuf;
94
95 Uint32 * sharedReadIndex1 = base1;
96 Uint32 * sharedWriteIndex1 = base1 + 1;
97 serverStatusFlag = base1 + 4;
98 char * startOfBuf1 = shmBuf+sharedSize;
99
100 Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
101 Uint32 * sharedReadIndex2 = base2;
102 Uint32 * sharedWriteIndex2 = base2 + 1;
103 clientStatusFlag = base2 + 4;
104 char * startOfBuf2 = ((char *)base2)+sharedSize;
105
106 if(isServer){
107 * serverStatusFlag = 0;
108 reader = new SHM_Reader(startOfBuf1,
109 sizeOfBuffer,
110 slack,
111 sharedReadIndex1,
112 sharedWriteIndex1);
113
114 writer = new SHM_Writer(startOfBuf2,
115 sizeOfBuffer,
116 slack,
117 sharedReadIndex2,
118 sharedWriteIndex2);
119
120 * sharedReadIndex1 = 0;
121 * sharedWriteIndex1 = 0;
122
123 * sharedReadIndex2 = 0;
124 * sharedWriteIndex2 = 0;
125
126 reader->clear();
127 writer->clear();
128
129 * serverStatusFlag = 1;
130
131 #ifdef DEBUG_TRANSPORTER
132 printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
133 printf("Reader at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
134 printf("sharedReadIndex1 at %d (%p) = %d\n",
135 (char*)sharedReadIndex1-shmBuf,
136 sharedReadIndex1, *sharedReadIndex1);
137 printf("sharedWriteIndex1 at %d (%p) = %d\n",
138 (char*)sharedWriteIndex1-shmBuf,
139 sharedWriteIndex1, *sharedWriteIndex1);
140
141 printf("Writer at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
142 printf("sharedReadIndex2 at %d (%p) = %d\n",
143 (char*)sharedReadIndex2-shmBuf,
144 sharedReadIndex2, *sharedReadIndex2);
145 printf("sharedWriteIndex2 at %d (%p) = %d\n",
146 (char*)sharedWriteIndex2-shmBuf,
147 sharedWriteIndex2, *sharedWriteIndex2);
148
149 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
150 #endif
151 } else {
152 * clientStatusFlag = 0;
153 reader = new SHM_Reader(startOfBuf2,
154 sizeOfBuffer,
155 slack,
156 sharedReadIndex2,
157 sharedWriteIndex2);
158
159 writer = new SHM_Writer(startOfBuf1,
160 sizeOfBuffer,
161 slack,
162 sharedReadIndex1,
163 sharedWriteIndex1);
164
165 * sharedReadIndex2 = 0;
166 * sharedWriteIndex1 = 0;
167
168 reader->clear();
169 writer->clear();
170 * clientStatusFlag = 1;
171 #ifdef DEBUG_TRANSPORTER
172 printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
173 printf("Reader at: %d (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
174 printf("sharedReadIndex2 at %d (%p) = %d\n",
175 (char*)sharedReadIndex2-shmBuf,
176 sharedReadIndex2, *sharedReadIndex2);
177 printf("sharedWriteIndex2 at %d (%p) = %d\n",
178 (char*)sharedWriteIndex2-shmBuf,
179 sharedWriteIndex2, *sharedWriteIndex2);
180
181 printf("Writer at: %d (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
182 printf("sharedReadIndex1 at %d (%p) = %d\n",
183 (char*)sharedReadIndex1-shmBuf,
184 sharedReadIndex1, *sharedReadIndex1);
185 printf("sharedWriteIndex1 at %d (%p) = %d\n",
186 (char*)sharedWriteIndex1-shmBuf,
187 sharedWriteIndex1, *sharedWriteIndex1);
188
189 printf("sizeOfBuffer = %d\n", sizeOfBuffer);
190 #endif
191 }
192 #ifdef DEBUG_TRANSPORTER
193 printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
194 #endif
195 }
196
197 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)198 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
199 {
200 DBUG_ENTER("SHM_Transporter::connect_server_impl");
201 SocketOutputStream s_output(sockfd);
202 SocketInputStream s_input(sockfd);
203 char buf[256];
204
205 // Create
206 if(!_shmSegCreated){
207 if (!ndb_shm_create()) {
208 make_error_info(buf, sizeof(buf));
209 report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT, buf);
210 NDB_CLOSE_SOCKET(sockfd);
211 DBUG_RETURN(false);
212 }
213 _shmSegCreated = true;
214 }
215
216 // Attach
217 if(!_attached){
218 if (!ndb_shm_attach()) {
219 make_error_info(buf, sizeof(buf));
220 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
221 NDB_CLOSE_SOCKET(sockfd);
222 DBUG_RETURN(false);
223 }
224 _attached = true;
225 }
226
227 // Send ok to client
228 s_output.println("shm server 1 ok: %d",
229 m_transporter_registry.m_shm_own_pid);
230
231 // Wait for ok from client
232 DBUG_PRINT("info", ("Wait for ok from client"));
233 if (s_input.gets(buf, sizeof(buf)) == 0)
234 {
235 NDB_CLOSE_SOCKET(sockfd);
236 DBUG_RETURN(false);
237 }
238
239 if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
240 {
241 NDB_CLOSE_SOCKET(sockfd);
242 DBUG_RETURN(false);
243 }
244
245 int r= connect_common(sockfd);
246
247 if (r) {
248 // Send ok to client
249 s_output.println("shm server 2 ok");
250 // Wait for ok from client
251 if (s_input.gets(buf, 256) == 0) {
252 NDB_CLOSE_SOCKET(sockfd);
253 DBUG_RETURN(false);
254 }
255 DBUG_PRINT("info", ("Successfully connected server to node %d",
256 remoteNodeId));
257 }
258
259 NDB_CLOSE_SOCKET(sockfd);
260 DBUG_RETURN(r);
261 }
262
263 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)264 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
265 {
266 DBUG_ENTER("SHM_Transporter::connect_client_impl");
267 SocketInputStream s_input(sockfd);
268 SocketOutputStream s_output(sockfd);
269 char buf[256];
270
271 // Wait for server to create and attach
272 DBUG_PRINT("info", ("Wait for server to create and attach"));
273 if (s_input.gets(buf, 256) == 0) {
274 NDB_CLOSE_SOCKET(sockfd);
275 DBUG_PRINT("error", ("Server id %d did not attach",
276 remoteNodeId));
277 DBUG_RETURN(false);
278 }
279
280 if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
281 {
282 NDB_CLOSE_SOCKET(sockfd);
283 DBUG_RETURN(false);
284 }
285
286 // Create
287 if(!_shmSegCreated){
288 if (!ndb_shm_get()) {
289 NDB_CLOSE_SOCKET(sockfd);
290 DBUG_PRINT("error", ("Failed create of shm seg to node %d",
291 remoteNodeId));
292 DBUG_RETURN(false);
293 }
294 _shmSegCreated = true;
295 }
296
297 // Attach
298 if(!_attached){
299 if (!ndb_shm_attach()) {
300 make_error_info(buf, sizeof(buf));
301 report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT, buf);
302 NDB_CLOSE_SOCKET(sockfd);
303 DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
304 remoteNodeId));
305 DBUG_RETURN(false);
306 }
307 _attached = true;
308 }
309
310 // Send ok to server
311 s_output.println("shm client 1 ok: %d",
312 m_transporter_registry.m_shm_own_pid);
313
314 int r= connect_common(sockfd);
315
316 if (r) {
317 // Wait for ok from server
318 DBUG_PRINT("info", ("Wait for ok from server"));
319 if (s_input.gets(buf, 256) == 0) {
320 NDB_CLOSE_SOCKET(sockfd);
321 DBUG_PRINT("error", ("No ok from server node %d",
322 remoteNodeId));
323 DBUG_RETURN(false);
324 }
325 // Send ok to server
326 s_output.println("shm client 2 ok");
327 DBUG_PRINT("info", ("Successfully connected client to node %d",
328 remoteNodeId));
329 }
330
331 NDB_CLOSE_SOCKET(sockfd);
332 DBUG_RETURN(r);
333 }
334
335 bool
connect_common(NDB_SOCKET_TYPE sockfd)336 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
337 {
338 if (!checkConnected()) {
339 return false;
340 }
341
342 if(!setupBuffersDone)
343 {
344 setupBuffers();
345 setupBuffersDone=true;
346 }
347
348 if(setupBuffersDone)
349 {
350 NdbSleep_MilliSleep(m_timeOutMillis);
351 if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
352 {
353 m_last_signal = 0;
354 return true;
355 }
356 }
357
358 DBUG_PRINT("error", ("Failed to set up buffers to node %d",
359 remoteNodeId));
360 return false;
361 }
362
363 void
doSend()364 SHM_Transporter::doSend()
365 {
366 if(m_last_signal)
367 {
368 m_last_signal = 0;
369 kill(m_remote_pid, g_ndb_shm_signum);
370 }
371 }
372
373 Uint32
get_free_buffer() const374 SHM_Transporter::get_free_buffer() const
375 {
376 return writer->get_free_buffer();
377 }
378