1 /*
2    Copyright (c) 2003, 2019, Oracle and/or its affiliates. All rights reserved.
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License, version 2.0,
6    as published by the Free Software Foundation.
7 
8    This program is also distributed with certain software (including
9    but not limited to OpenSSL) that is licensed under separate terms,
10    as designated in a particular file or component or in included license
11    documentation.  The authors of MySQL hereby grant you an additional
12    permission to link the program and your derivative works with the
13    separately licensed software that they have included with MySQL.
14 
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License, version 2.0, for more details.
19 
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
23 */
24 
25 
26 #include <ndb_global.h>
27 
28 #include "SHM_Transporter.hpp"
29 #include "TransporterInternalDefinitions.hpp"
30 #include <TransporterCallback.hpp>
31 #include <NdbSleep.h>
32 #include <NdbOut.hpp>
33 #include <NdbMutex.h>
34 #include <ndb_localtime.h>
35 
36 #include <InputStream.hpp>
37 #include <OutputStream.hpp>
38 
39 #include <EventLogger.hpp>
40 extern EventLogger * g_eventLogger;
41 
42 #if 0
43 #define DEBUG_FPRINTF(arglist) do { fprintf arglist ; } while (0)
44 #else
45 #define DEBUG_FPRINTF(a)
46 #endif
47 
SHM_Transporter(TransporterRegistry & t_reg,TrpId transporter_index,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection_arg,NodeId lNodeId,NodeId rNodeId,NodeId serverNodeId,bool checksum,bool signalId,key_t _shmKey,Uint32 _shmSize,bool preSendChecksum,Uint32 _spintime,Uint32 _send_buffer_size)48 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
49                                  TrpId transporter_index,
50 				 const char *lHostName,
51 				 const char *rHostName,
52 				 int r_port,
53 				 bool isMgmConnection_arg,
54 				 NodeId lNodeId,
55 				 NodeId rNodeId,
56 				 NodeId serverNodeId,
57 				 bool checksum,
58 				 bool signalId,
59 				 key_t _shmKey,
60 				 Uint32 _shmSize,
61 				 bool preSendChecksum,
62                                  Uint32 _spintime,
63                                  Uint32 _send_buffer_size) :
64   Transporter(t_reg, transporter_index, tt_SHM_TRANSPORTER,
65 	      lHostName, rHostName, r_port, isMgmConnection_arg,
66 	      lNodeId, rNodeId, serverNodeId,
67 	      0, false, checksum, signalId,
68               _send_buffer_size,
69               preSendChecksum,
70               _spintime),
71   shmKey(_shmKey),
72   shmSize(_shmSize)
73 {
74 #ifndef _WIN32
75   shmId= 0;
76 #endif
77   _shmSegCreated = false;
78   _attached = false;
79 
80   shmBuf = 0;
81   reader = 0;
82   writer = 0;
83 
84   setupBuffersDone = false;
85   m_server_locked = false;
86   m_client_locked = false;
87 #ifdef DEBUG_TRANSPORTER
88   printf("shm key (%d - %d) = %d\n", lNodeId, rNodeId, shmKey);
89 #endif
90   m_signal_threshold = 262144;
91 }
92 
SHM_Transporter(TransporterRegistry & t_reg,const SHM_Transporter * t)93 SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
94                                  const SHM_Transporter* t)
95   :
96   Transporter(t_reg,
97               0,
98               tt_SHM_TRANSPORTER,
99 	      t->localHostName,
100 	      t->remoteHostName,
101 	      t->m_s_port,
102 	      t->isMgmConnection,
103 	      t->localNodeId,
104 	      t->remoteNodeId,
105 	      t->isServer ? t->localNodeId : t->remoteNodeId,
106 	      0,
107               false,
108 	      t->checksumUsed,
109 	      t->signalIdUsed,
110 	      t->m_max_send_buffer,
111 	      t->check_send_checksum,
112               t->m_spintime)
113 {
114   shmKey = t->shmKey;
115   shmSize = t->shmSize;
116 #ifndef NDB_WIN32
117   shmId= 0;
118 #endif
119   _shmSegCreated = false;
120   _attached = false;
121 
122   shmBuf = 0;
123   reader = 0;
124   writer = 0;
125 
126   setupBuffersDone = false;
127   m_server_locked = false;
128   m_client_locked = false;
129 #ifdef DEBUG_TRANSPORTER
130   printf("shm key (%d - %d) = %d\n",
131          t->localNodeId, t->remoteNodeId, shmKey);
132 #endif
133   m_signal_threshold = 262144;
134   send_checksum_state.init();
135 }
136 
137 
138 bool
configure_derived(const TransporterConfiguration * conf)139 SHM_Transporter::configure_derived(const TransporterConfiguration* conf)
140 {
141   if ((key_t)conf->shm.shmKey == shmKey &&
142       (int)conf->shm.shmSize == shmSize)
143     return true; // No change
144   return false; // Can't reconfigure
145 }
146 
147 
~SHM_Transporter()148 SHM_Transporter::~SHM_Transporter()
149 {
150   DEBUG_FPRINTF((stderr, "(%u)doDisconnect(%u), line: %d\n",
151                 localNodeId, remoteNodeId, __LINE__));
152   doDisconnect();
153 }
154 
155 void
resetBuffers()156 SHM_Transporter::resetBuffers()
157 {
158   assert(!isConnected());
159   DEBUG_FPRINTF((stderr, "(%u)resetBuffers(%u), line: %d\n",
160                 localNodeId, remoteNodeId, __LINE__));
161   detach_shm(true);
162   send_checksum_state.init();
163 }
164 
165 bool
initTransporter()166 SHM_Transporter::initTransporter()
167 {
168   return true;
169 }
170 
171 bool
setupBuffers()172 SHM_Transporter::setupBuffers()
173 {
174   Uint32 sharedSize = 0;
175   sharedSize += 64;
176   sharedSize += sizeof(NdbMutex);
177 
178   const Uint32 slack = MAX(MAX_RECV_MESSAGE_BYTESIZE,
179                            MAX_SEND_MESSAGE_BYTESIZE);
180 
181   /**
182    *  NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
183    */
184   Uint32 sizeOfBuffer = shmSize;
185   sizeOfBuffer -= 2*sharedSize;
186   sizeOfBuffer /= 2;
187 
188   Uint32 * base1 = (Uint32*)shmBuf;
189 
190   Uint32 * sharedReadIndex1 = base1;
191   Uint32 * sharedWriteIndex1 = base1 + 1;
192   serverStatusFlag = base1 + 4;
193   serverAwakenedFlag = base1 + 5;
194   serverUpFlag = base1 + 6;
195   serverMutex = (NdbMutex*)(base1 + 16);
196   char * startOfBuf1 = shmBuf+sharedSize;
197 
198   Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
199   Uint32 * sharedReadIndex2 = base2;
200   Uint32 * sharedWriteIndex2 = base2 + 1;
201   clientStatusFlag = base2 + 4;
202   clientAwakenedFlag = base2 + 5;
203   clientUpFlag = base2 + 6;
204   clientMutex = (NdbMutex*)(base2 + 16);
205   char * startOfBuf2 = ((char *)base2)+sharedSize;
206 
207   if (isServer)
208   {
209     if ((NdbMutex_Init_Shared(serverMutex) != 0) ||
210         (NdbMutex_Init_Shared(clientMutex) != 0))
211     {
212       return true;
213     }
214     * serverAwakenedFlag = 0;
215     * clientAwakenedFlag = 0;
216     * serverUpFlag = 1;
217     * clientUpFlag = 0;
218   }
219   else
220   {
221     NdbMutex_Lock(serverMutex);
222     * clientUpFlag = 1;
223     NdbMutex_Unlock(serverMutex);
224   }
225 
226   if (reader != 0)
227   {
228     DEBUG_FPRINTF((stderr, "(%u)reader = %p, m_shm_reader: %p (%u) LINE:%d",
229                    localNodeId, reader, &m_shm_reader, remoteNodeId, __LINE__));
230   }
231   assert(reader == 0);
232   assert(writer == 0);
233   if(isServer)
234   {
235     * serverStatusFlag = 0;
236     reader = new (&m_shm_reader)
237                  SHM_Reader(startOfBuf1,
238 			    sizeOfBuffer,
239 			    slack,
240 			    sharedReadIndex1,
241 			    sharedWriteIndex1);
242 
243     writer = new (&m_shm_writer)
244                             SHM_Writer(startOfBuf2,
245 			    sizeOfBuffer,
246 			    slack,
247 			    sharedReadIndex2,
248 			    sharedWriteIndex2);
249 
250     * sharedReadIndex1 = 0;
251     * sharedWriteIndex1 = 0;
252 
253     * sharedReadIndex2 = 0;
254     * sharedWriteIndex2 = 0;
255 
256     * serverStatusFlag = 1;
257 
258 #ifdef DEBUG_TRANSPORTER
259     printf("-- (%d - %d) - Server -\n", localNodeId, remoteNodeId);
260     printf("Reader at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
261     printf("sharedReadIndex1 at %ld (%p) = %d\n",
262 	   (char*)sharedReadIndex1-shmBuf,
263 	   sharedReadIndex1, *sharedReadIndex1);
264     printf("sharedWriteIndex1 at %ld (%p) = %d\n",
265 	   (char*)sharedWriteIndex1-shmBuf,
266 	   sharedWriteIndex1, *sharedWriteIndex1);
267 
268     printf("Writer at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
269     printf("sharedReadIndex2 at %ld (%p) = %d\n",
270 	   (char*)sharedReadIndex2-shmBuf,
271 	   sharedReadIndex2, *sharedReadIndex2);
272     printf("sharedWriteIndex2 at %ld (%p) = %d\n",
273 	   (char*)sharedWriteIndex2-shmBuf,
274 	   sharedWriteIndex2, *sharedWriteIndex2);
275 
276     printf("sizeOfBuffer = %d\n", sizeOfBuffer);
277 #endif
278   }
279   else
280   {
281     * clientStatusFlag = 0;
282     reader = new (&m_shm_reader)
283                  SHM_Reader(startOfBuf2,
284 			    sizeOfBuffer,
285 			    slack,
286 			    sharedReadIndex2,
287 			    sharedWriteIndex2);
288 
289     writer = new (&m_shm_writer)
290                  SHM_Writer(startOfBuf1,
291 			    sizeOfBuffer,
292 			    slack,
293 			    sharedReadIndex1,
294 			    sharedWriteIndex1);
295 
296     * sharedReadIndex2 = 0;
297     * sharedWriteIndex1 = 0;
298 
299     * clientStatusFlag = 1;
300 #ifdef DEBUG_TRANSPORTER
301     printf("-- (%d - %d) - Client -\n", localNodeId, remoteNodeId);
302     printf("Reader at: %ld (%p)\n", startOfBuf2 - shmBuf, startOfBuf2);
303     printf("sharedReadIndex2 at %ld (%p) = %d\n",
304 	   (char*)sharedReadIndex2-shmBuf,
305 	   sharedReadIndex2, *sharedReadIndex2);
306     printf("sharedWriteIndex2 at %ld (%p) = %d\n",
307 	   (char*)sharedWriteIndex2-shmBuf,
308 	   sharedWriteIndex2, *sharedWriteIndex2);
309 
310     printf("Writer at: %ld (%p)\n", startOfBuf1 - shmBuf, startOfBuf1);
311     printf("sharedReadIndex1 at %ld (%p) = %d\n",
312 	   (char*)sharedReadIndex1-shmBuf,
313 	   sharedReadIndex1, *sharedReadIndex1);
314     printf("sharedWriteIndex1 at %ld (%p) = %d\n",
315 	   (char*)sharedWriteIndex1-shmBuf,
316 	   sharedWriteIndex1, *sharedWriteIndex1);
317 
318     printf("sizeOfBuffer = %d\n", sizeOfBuffer);
319 #endif
320   }
321 #ifdef DEBUG_TRANSPORTER
322   printf("Mapping from %p to %p\n", shmBuf, shmBuf+shmSize);
323 #endif
324   return false;
325 }
326 
327 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)328 SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
329 {
330   DBUG_ENTER("SHM_Transporter::connect_server_impl");
331   DEBUG_FPRINTF((stderr, "(%u)connect_server_impl(%u)\n",
332                  localNodeId, remoteNodeId));
333   SocketOutputStream s_output(sockfd);
334   SocketInputStream s_input(sockfd);
335 
336   // Create
337   if (!_shmSegCreated)
338   {
339     if (!ndb_shm_create())
340     {
341       DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
342                              " to remote node %d\n",
343                      localNodeId, __LINE__, remoteNodeId));
344       DBUG_RETURN(false);
345     }
346     _shmSegCreated = true;
347     DEBUG_FPRINTF((stderr, "(%u)ndb_shm_create()(%u)\n",
348                    localNodeId, remoteNodeId));
349   }
350 
351   // Attach
352   if (!_attached)
353   {
354     if (!ndb_shm_attach())
355     {
356       DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
357                              " to remote node %d\n",
358                      localNodeId, __LINE__, remoteNodeId));
359       DBUG_RETURN(false);
360     }
361     _attached = true;
362     DEBUG_FPRINTF((stderr, "(%u)ndb_shm_attach()(%u)\n",
363                    localNodeId, remoteNodeId));
364   }
365 
366   require(!setupBuffersDone);
367   {
368     DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) Line:%d\n",
369                    localNodeId, remoteNodeId, __LINE__));
370     if (setupBuffers())
371     {
372       fprintf(stderr, "Shared memory not supported on this platform\n");
373       detach_shm(false);
374       DBUG_RETURN(false);
375     }
376     setupBuffersDone=true;
377   }
378 
379   // Send ok to client
380   s_output.println("shm server 1 ok: %d",
381 		   m_transporter_registry.m_shm_own_pid);
382 
383   // Wait for ok from client
384   char buf[256];
385   DBUG_PRINT("info", ("Wait for ok from client"));
386   if (s_input.gets(buf, sizeof(buf)) == 0)
387   {
388     DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
389                            " to remote node %d\n",
390                    localNodeId, __LINE__, remoteNodeId));
391     detach_shm(false);
392     DBUG_RETURN(false);
393   }
394 
395   if (sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
396   {
397     DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
398                            " to remote node %d\n",
399                    localNodeId, __LINE__, remoteNodeId));
400     detach_shm(false);
401     DBUG_RETURN(false);
402   }
403 
404   DEBUG_FPRINTF((stderr, "(%u)connect_common()(%u)\n",
405                  localNodeId, remoteNodeId));
406   int r= connect_common(sockfd);
407 
408   if (r)
409   {
410     // Send ok to client
411     s_output.println("shm server 2 ok");
412     // Wait for ok from client
413     if (s_input.gets(buf, 256) == 0)
414     {
415       DEBUG_FPRINTF((stderr, "(%u)connect_server_impl failed LINE:%d,"
416                              " to remote node %d\n",
417                      localNodeId, __LINE__, remoteNodeId));
418       detach_shm(false);
419       DBUG_RETURN(false);
420     }
421     DBUG_PRINT("info", ("Successfully connected server to node %d",
422                 remoteNodeId));
423   }
424   DEBUG_FPRINTF((stderr, "(%u)set_socket()(%u)\n",
425                  localNodeId, remoteNodeId));
426   set_socket(sockfd);
427   DBUG_RETURN(r);
428 }
429 
430 void
set_socket(NDB_SOCKET_TYPE sockfd)431 SHM_Transporter::set_socket(NDB_SOCKET_TYPE sockfd)
432 {
433   set_get(sockfd, IPPROTO_TCP, TCP_NODELAY, "TCP_NODELAY", 1);
434   set_get(sockfd, SOL_SOCKET, SO_KEEPALIVE, "SO_KEEPALIVE", 1);
435   ndb_socket_nonblock(sockfd, true);
436   get_callback_obj()->lock_transporter(remoteNodeId, m_transporter_index);
437   theSocket = sockfd;
438   send_checksum_state.init();
439   get_callback_obj()->unlock_transporter(remoteNodeId, m_transporter_index);
440 }
441 
442 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)443 SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
444 {
445   DBUG_ENTER("SHM_Transporter::connect_client_impl");
446   DEBUG_FPRINTF((stderr, "(%u)connect_client_impl(%u)\n",
447                  localNodeId, remoteNodeId));
448   SocketInputStream s_input(sockfd);
449   SocketOutputStream s_output(sockfd);
450   char buf[256];
451 
452   // Wait for server to create and attach
453   DBUG_PRINT("info", ("Wait for server to create and attach"));
454   if (s_input.gets(buf, 256) == 0)
455   {
456     DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
457                            " to remote node %d\n",
458                            localNodeId, __LINE__, remoteNodeId));
459 
460     DBUG_PRINT("error", ("Server id %d did not attach",
461                 remoteNodeId));
462     DBUG_RETURN(false);
463   }
464 
465   if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
466   {
467     DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
468                            " to remote node %d\n",
469                            localNodeId, __LINE__, remoteNodeId));
470     DBUG_RETURN(false);
471   }
472 
473   // Create
474   if(!_shmSegCreated)
475   {
476     if (!ndb_shm_get())
477     {
478       DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
479                              " to remote node %d\n",
480                              localNodeId, __LINE__, remoteNodeId));
481       DBUG_PRINT("error", ("Failed create of shm seg to node %d",
482                   remoteNodeId));
483       DBUG_RETURN(false);
484     }
485     _shmSegCreated = true;
486     DEBUG_FPRINTF((stderr, "(%u)ndb_shm_get(%u)\n",
487                    localNodeId, remoteNodeId));
488   }
489 
490   // Attach
491   if (!_attached)
492   {
493     if (!ndb_shm_attach())
494     {
495       DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
496                              " to remote node %d\n",
497                              localNodeId, __LINE__, remoteNodeId));
498       DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
499                   remoteNodeId));
500       DBUG_RETURN(false);
501     }
502     _attached = true;
503     DEBUG_FPRINTF((stderr, "(%u)ndb_shm_attach(%u)\n",
504                    localNodeId, remoteNodeId));
505   }
506 
507   require(!setupBuffersDone);
508   {
509     DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) Line:%d\n",
510                    localNodeId, remoteNodeId, __LINE__));
511     if (setupBuffers())
512     {
513       fprintf(stderr, "Shared memory not supported on this platform\n");
514       detach_shm(false);
515       DBUG_RETURN(false);
516     }
517     else
518     {
519       setupBuffersDone=true;
520     }
521   }
522 
523   // Send ok to server
524   s_output.println("shm client 1 ok: %d",
525 		   m_transporter_registry.m_shm_own_pid);
526 
527   DEBUG_FPRINTF((stderr, "(%u)connect_common(%u)\n",
528                  localNodeId, remoteNodeId));
529   int r = connect_common(sockfd);
530 
531   if (r)
532   {
533     // Wait for ok from server
534     DBUG_PRINT("info", ("Wait for ok from server"));
535     if (s_input.gets(buf, 256) == 0)
536     {
537       DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
538                              " to remote node %d\n",
539                              localNodeId, __LINE__, remoteNodeId));
540       DBUG_PRINT("error", ("No ok from server node %d",
541                   remoteNodeId));
542       detach_shm(false);
543       DBUG_RETURN(false);
544     }
545     // Send ok to server
546     s_output.println("shm client 2 ok");
547     DBUG_PRINT("info", ("Successfully connected client to node %d",
548                 remoteNodeId));
549   }
550   else
551   {
552     DEBUG_FPRINTF((stderr, "(%u)connect_client_impl failed LINE:%d,"
553                            " to remote node %d\n",
554                            localNodeId, __LINE__, remoteNodeId));
555     detach_shm(false);
556   }
557   set_socket(sockfd);
558   DEBUG_FPRINTF((stderr, "(%u)set_socket(%u)\n",
559                  localNodeId, remoteNodeId));
560   DBUG_RETURN(r);
561 }
562 
563 bool
connect_common(NDB_SOCKET_TYPE sockfd)564 SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
565 {
566   if (!checkConnected())
567   {
568     DEBUG_FPRINTF((stderr, "(%u)checkConnected failed(%u)\n",
569                    localNodeId, remoteNodeId));
570     return false;
571   }
572   if (isServer)
573   {
574     DEBUG_FPRINTF((stderr, "(%u)ndb_shm_destroy(%u)\n",
575                    localNodeId, remoteNodeId));
576     ndb_shm_destroy();
577   }
578 
579   require(setupBuffersDone);
580   Uint32 waited = 0;
581   while (waited < m_timeOutMillis)
582   {
583     if (*serverStatusFlag == 1 && *clientStatusFlag == 1)
584       return true;
585     NdbSleep_MilliSleep(10);
586     waited += 10;
587   }
588   DEBUG_FPRINTF((stderr, "(%u)setupBuffers(%u) failed\n",
589                  localNodeId, remoteNodeId));
590   DBUG_PRINT("error", ("Failed to set up buffers to node %d",
591               remoteNodeId));
592   return false;
593 }
594 
595 void
remove_mutexes()596 SHM_Transporter::remove_mutexes()
597 {
598   if (ndb_socket_valid(theSocket))
599   {
600     NdbMutex_Deinit(serverMutex);
601     NdbMutex_Deinit(clientMutex);
602   }
603 }
604 
setupBuffersUndone()605 void SHM_Transporter::setupBuffersUndone()
606 {
607   if (setupBuffersDone)
608   {
609     NdbMutex_Lock(serverMutex);
610     NdbMutex_Lock(clientMutex);
611     setupBuffersDone = false;
612     DEBUG_FPRINTF((stderr, "(%u)setupBuffersUndone(%u)\n",
613                    localNodeId, remoteNodeId));
614     NdbMutex_Unlock(serverMutex);
615     NdbMutex_Unlock(clientMutex);
616   }
617 }
618 
619 void
disconnect_socket()620 SHM_Transporter::disconnect_socket()
621 {
622   get_callback_obj()->lock_transporter(remoteNodeId, m_transporter_index);
623 
624   NDB_SOCKET_TYPE sock = theSocket;
625   ndb_socket_invalidate(&theSocket);
626 
627 
628   if(ndb_socket_valid(sock))
629   {
630     if(ndb_socket_close(sock) < 0){
631       report_error(TE_ERROR_CLOSING_SOCKET);
632     }
633   }
634   setupBuffersUndone();
635   get_callback_obj()->unlock_transporter(remoteNodeId, m_transporter_index);
636 }
637 
638 /**
639  * This method is used when we need to wake up other side to
640  * ensure that the messages we transported in shared memory
641  * transporter is quickly handled.
642  *
643  * The first step is to grab a mutex from the shared memory segment,
644  * next we check the status of the transporter on the other side. If
645  * this transporter is asleep we will simply send 1 byte, it doesn't
646  * matter what the byte value is. We set it to 0 just to ensure it
647  * has defined value for potential future use.
648  *
649  * If we discover that the other side is awake there is no need to
650  * do anything, the other side will check the shared memory before
651  * it goes to sleep.
652  */
653 void
wakeup()654 SHM_Transporter::wakeup()
655 {
656   Uint32 one_more_try = 5;
657   char buf[1];
658   int iovcnt = 1;
659   struct iovec iov[1];
660 
661   lock_reverse_mutex();
662   bool awake_state = handle_reverse_awake_state();
663   unlock_reverse_mutex();
664   if (awake_state)
665   {
666     return;
667   }
668   iov[0].iov_len = 1;
669   iov[0].iov_base = &buf[0];
670   buf[0] = 0;
671   do
672   {
673     one_more_try--;
674     int nBytesSent = (int)ndb_socket_writev(theSocket, iov, iovcnt);
675     if (nBytesSent != 1)
676     {
677       require(nBytesSent < 0); //Should not be possible with any other value
678       int err = ndb_socket_errno();
679       if (DISCONNECT_ERRNO(err, nBytesSent))
680       {
681         do_disconnect(err, true);
682       }
683     }
684     else
685     {
686       return;
687     }
688   } while (one_more_try);
689 }
690 
691 void
doReceive()692 SHM_Transporter::doReceive()
693 {
694   bool one_more_try;
695   char buf[128];
696   do
697   {
698     one_more_try = false;
699     const int nBytesRead = (int)ndb_recv(theSocket, buf, sizeof(buf), 0);
700     if (unlikely(nBytesRead <= 0))
701     {
702       int err;
703       if (nBytesRead == 0)
704       {
705         err = 0;
706       }
707       else
708       {
709         err = ndb_socket_errno();
710       }
711       if (DISCONNECT_ERRNO(err, nBytesRead))
712       {
713         do_disconnect(err, false);
714       }
715       else
716       {
717         one_more_try = false;
718       }
719     }
720     else if (unlikely(nBytesRead == sizeof(buf)))
721     {
722       one_more_try = true;
723     }
724   } while (one_more_try);
725 }
726 
727 /**
728  * The need_wakeup flag is always set except when called from
729  * forceSend in mt.cpp, in this case we only send to try to
730  * free up some send buffers. So there is no need to ensure
731  * that the other side is awakened in this special case.
732  */
733 bool
doSend(bool need_wakeup)734 SHM_Transporter::doSend(bool need_wakeup)
735 {
736   struct iovec iov[64];
737   Uint32 cnt = fetch_send_iovec_data(iov, NDB_ARRAY_SIZE(iov));
738 
739   if (!setupBuffersDone)
740   {
741     DEBUG_FPRINTF((stderr, "(%u)doSend(%u)\n", localNodeId, remoteNodeId));
742     return false;
743   }
744   if (cnt == 0)
745   {
746     /**
747      * Need to handle the wakeup flag, even when there is nothing to
748      * send. We can call doSend in an attempt to do an emergency send.
749      * In this case we could register a pending send even with an
750      * empty send buffer. So this could lead to a later doSend call
751      * that have no data to send. So the idea is to delay the wakeup
752      * until end of execution even if the send buffer is full in the
753      * middle of executing signals.
754      */
755     if (need_wakeup)
756     {
757       wakeup();
758     }
759     return false;
760   }
761 
762   Uint32 sum = 0;
763   for(Uint32 i = 0; i<cnt; i++)
764   {
765     assert(iov[i].iov_len);
766     sum += iov[i].iov_len;
767   }
768 
769   int nBytesSent = writer->writev(iov, cnt);
770 #if 0
771   time_t curr_time;
772   tm tm_buf;
773   curr_time = ::time((time_t*)NULL);
774   ndb_localtime_r(&curr_time, &tm_buf);
775   Uint32 minute = tm_buf.tm_min;
776   Uint32 second = tm_buf.tm_sec;
777   Uint64 millis = NdbTick_CurrentMillisecond();
778   DEBUG_FPRINTF((stderr, "%u.%u.%llu (%u)W:writev(%u),"
779                          " sent: %d, free: %u"
780                          ", w_inx: %u, r_inx: %u\n",
781                 minute, second,
782                 millis % Uint64(1000),
783                 localNodeId, remoteNodeId, nBytesSent,
784                 writer->get_free_buffer(),
785                 writer->getWriteIndex(),
786                 writer->getReadIndex()));
787 #endif
788   if (nBytesSent > 0)
789   {
790     iovec_data_sent(nBytesSent);
791     m_bytes_sent += nBytesSent;
792     sendCount++;
793     sendSize += nBytesSent;
794     if (sendCount >= reportFreq)
795     {
796       get_callback_obj()->reportSendLen(remoteNodeId, sendCount, sendSize);
797       sendCount = 0;
798       sendSize  = 0;
799     }
800 
801     if (need_wakeup)
802     {
803       wakeup();
804     }
805     if (Uint32(nBytesSent) == sum &&
806         (cnt != NDB_ARRAY_SIZE(iov)) &&
807         need_wakeup)
808     {
809       return false;
810     }
811     return true;
812   }
813   return true;
814 }
815 
816 /*
817  * We need the extra m_client_locked and m_server_locked
818  * variables to ensure that we don't unlock something
819  * that was never locked. The timing of the setting up
820  * of buffers and locking of mutexes isn't perfect,
821  * therefore we protect those calls through these variables.
822  */
823 void
lock_mutex()824 SHM_Transporter::lock_mutex()
825 {
826   if (setupBuffersDone)
827   {
828     if (isServer)
829     {
830       NdbMutex_Lock(serverMutex);
831       m_server_locked = true;
832     }
833     else
834     {
835       NdbMutex_Lock(clientMutex);
836       m_client_locked = true;
837     }
838   }
839 }
840 
841 void
unlock_mutex()842 SHM_Transporter::unlock_mutex()
843 {
844   if (setupBuffersDone)
845   {
846     if (isServer)
847     {
848       if (m_server_locked)
849         NdbMutex_Unlock(serverMutex);
850     }
851     else
852     {
853       if (m_client_locked)
854         NdbMutex_Unlock(clientMutex);
855     }
856   }
857 }
858 
859 void
lock_reverse_mutex()860 SHM_Transporter::lock_reverse_mutex()
861 {
862   if (setupBuffersDone)
863   {
864     if (isServer)
865     {
866       NdbMutex_Lock(clientMutex);
867       m_client_locked = true;
868     }
869     else
870     {
871       NdbMutex_Lock(serverMutex);
872       m_server_locked = true;
873     }
874   }
875 }
876 
877 void
unlock_reverse_mutex()878 SHM_Transporter::unlock_reverse_mutex()
879 {
880   if (setupBuffersDone)
881   {
882     if (isServer)
883     {
884       if (m_client_locked)
885         NdbMutex_Unlock(clientMutex);
886     }
887     else
888     {
889       if (m_server_locked)
890         NdbMutex_Unlock(serverMutex);
891     }
892   }
893 }
894 
895 void
set_awake_state(Uint32 awake_state)896 SHM_Transporter::set_awake_state(Uint32 awake_state)
897 {
898   if (setupBuffersDone)
899   {
900     if (isServer)
901     {
902       *serverStatusFlag = awake_state;
903       *serverAwakenedFlag = 0;
904     }
905     else
906     {
907       *clientStatusFlag = awake_state;
908       *clientAwakenedFlag = 0;
909     }
910   }
911 }
912 
913 bool
handle_reverse_awake_state()914 SHM_Transporter::handle_reverse_awake_state()
915 {
916   /**
917    * We are sending to the other side. We need to understand if we
918    * should send a wakeup byte to the other side. If we already did
919    * so and the other side still hasn't woke up, we need not do it
920    * again. If the other side is awake we also need not send any
921    * wakeup byte.
922    */
923   if (setupBuffersDone)
924   {
925     if (isServer)
926     {
927       if (*clientStatusFlag == 1 || *clientAwakenedFlag == 1)
928       {
929         return true;
930       }
931       else
932       {
933         *clientAwakenedFlag = 1;
934         return false;
935       }
936     }
937     else
938     {
939       if (*serverStatusFlag == 1 || *serverAwakenedFlag == 1)
940       {
941         return true;
942       }
943       else
944       {
945         *serverAwakenedFlag = 1;
946         return false;
947       }
948     }
949   }
950   else
951   {
952     return true;
953   }
954 }
955 
956 void
updateReceivePtr(TransporterReceiveHandle & recvdata,Uint32 * ptr)957 SHM_Transporter::updateReceivePtr(TransporterReceiveHandle& recvdata,
958                                   Uint32 *ptr)
959 {
960   Uint32 size_read = reader->updateReadPtr(ptr);
961 #if 0
962   time_t curr_time;
963   tm tm_buf;
964   curr_time = ::time((time_t*)NULL);
965   ndb_localtime_r(&curr_time, &tm_buf);
966   Uint32 minute = tm_buf.tm_min;
967   Uint32 second = tm_buf.tm_sec;
968   Uint64 millis = NdbTick_CurrentMillisecond();
969   DEBUG_FPRINTF((stderr, "%u.%u.%llu (%u)updateReadPtr(%u),"
970                          " sz_read: %u, r_inx: %u"
971                          ", w_inx: %u\n",
972                  minute, second,
973                  millis % Uint64(1000),
974                  localNodeId, remoteNodeId,
975                  size_read,
976                  reader->getReadIndex(),
977                  reader->getWriteIndex()));
978 #endif
979   receiveCount++;
980   receiveSize += size_read;
981   m_bytes_received += size_read;
982   if (receiveCount == reportFreq)
983   {
984     recvdata.reportReceiveLen(remoteNodeId,
985                               receiveCount,
986                               receiveSize);
987     receiveCount = 0;
988     receiveSize = 0;
989   }
990 }
991 
992 /**
993  * send_is_possible is only called in situations with high load.
994  * So it is not critical to use the mutex protection here.
995  */
996 bool
send_is_possible(int timeout_millisec) const997 SHM_Transporter::send_is_possible(int timeout_millisec) const
998 {
999   do
1000   {
1001     if (setupBuffersDone)
1002     {
1003       if (writer->get_free_buffer() > MAX_SEND_MESSAGE_BYTESIZE)
1004       {
1005         return true;
1006       }
1007       if (timeout_millisec > 0)
1008       {
1009         DEBUG_FPRINTF((stderr, "send_is_possible, wait 10ms\n"));
1010         NdbSleep_MilliSleep(timeout_millisec);
1011         timeout_millisec = 0;
1012       }
1013       DEBUG_FPRINTF((stderr, "send_is_possible, timed out\n"));
1014       return false;
1015     }
1016     else
1017     {
1018       break;
1019     }
1020   } while (1);
1021   return true;
1022 }
1023