1 /* Copyright (c) 2003-2005, 2007 MySQL AB
2    Use is subject to license terms
3 
4    This program is free software; you can redistribute it and/or modify
5    it under the terms of the GNU General Public License as published by
6    the Free Software Foundation; version 2 of the License.
7 
8    This program is distributed in the hope that it will be useful,
9    but WITHOUT ANY WARRANTY; without even the implied warranty of
10    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
11    GNU General Public License for more details.
12 
13    You should have received a copy of the GNU General Public License
14    along with this program; if not, write to the Free Software
15    Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA */
16 
17 #include <ndb_global.h>
18 
19 #include "SCI_Transporter.hpp"
20 #include <NdbOut.hpp>
21 #include <NdbSleep.h>
22 #include <NdbTick.h>
23 #include <NdbTick.h>
24 
25 #include "TransporterInternalDefinitions.hpp"
26 #include <TransporterCallback.hpp>
27 
28 #include <InputStream.hpp>
29 #include <OutputStream.hpp>
30 
31 #define FLAGS 0
32 #define DEBUG_TRANSPORTER
SCI_Transporter(TransporterRegistry & t_reg,const char * lHostName,const char * rHostName,int r_port,bool isMgmConnection,Uint32 packetSize,Uint32 bufferSize,Uint32 nAdapters,Uint16 remoteSciNodeId0,Uint16 remoteSciNodeId1,NodeId _localNodeId,NodeId _remoteNodeId,NodeId serverNodeId,bool chksm,bool signalId,Uint32 reportFreq)33 SCI_Transporter::SCI_Transporter(TransporterRegistry &t_reg,
34                                  const char *lHostName,
35                                  const char *rHostName,
36                                  int r_port,
37 				 bool isMgmConnection,
38                                  Uint32 packetSize,
39 				 Uint32 bufferSize,
40 				 Uint32 nAdapters,
41 				 Uint16 remoteSciNodeId0,
42 				 Uint16 remoteSciNodeId1,
43 				 NodeId _localNodeId,
44 				 NodeId _remoteNodeId,
45 				 NodeId serverNodeId,
46 				 bool chksm,
47 				 bool signalId,
48 				 Uint32 reportFreq) :
49   Transporter(t_reg, tt_SCI_TRANSPORTER,
50 	      lHostName, rHostName, r_port, isMgmConnection, _localNodeId,
51               _remoteNodeId, serverNodeId, 0, false, chksm, signalId)
52 {
53   DBUG_ENTER("SCI_Transporter::SCI_Transporter");
54   m_PacketSize = (packetSize + 3)/4 ;
55   m_BufferSize = bufferSize;
56   m_sendBuffer.m_buffer = NULL;
57 
58   m_RemoteSciNodeId = remoteSciNodeId0;
59 
60   if(remoteSciNodeId0 == 0 || remoteSciNodeId1 == 0)
61     m_numberOfRemoteNodes=1;
62   else
63     m_numberOfRemoteNodes=2;
64 
65   m_RemoteSciNodeId1 = remoteSciNodeId1;
66 
67 
68   m_initLocal=false;
69   m_failCounter=0;
70   m_remoteNodes[0]=remoteSciNodeId0;
71   m_remoteNodes[1]=remoteSciNodeId1;
72   m_adapters = nAdapters;
73   m_ActiveAdapterId=0;
74   m_StandbyAdapterId=1;
75 
76   m_mapped = false;
77   m_sciinit=false;
78 
79   sciAdapters= new SciAdapter[nAdapters* (sizeof (SciAdapter))];
80   if(sciAdapters==NULL) {
81   }
82   m_SourceSegm= new sourceSegm[nAdapters* (sizeof (sourceSegm))];
83   if(m_SourceSegm==NULL) {
84   }
85   m_TargetSegm= new targetSegm[nAdapters* (sizeof (targetSegm))];
86   if(m_TargetSegm==NULL) {
87   }
88   m_reportFreq= reportFreq;
89 
90   //reset all statistic counters.
91 #ifdef DEBUG_TRANSPORTER
92  i1024=0;
93  i2048=0;
94  i2049=0;
95  i10242048=0;
96  i20484096=0;
97  i4096=0;
98  i4097=0;
99 #endif
100   DBUG_VOID_RETURN;
101 }
102 
disconnectImpl()103 void SCI_Transporter::disconnectImpl()
104 {
105   DBUG_ENTER("SCI_Transporter::disconnectImpl");
106   sci_error_t err;
107   if(m_mapped){
108     setDisconnect();
109     DBUG_PRINT("info", ("connect status = %d, remote node = %d",
110     (int)getConnectionStatus(), remoteNodeId));
111     disconnectRemote();
112     disconnectLocal();
113   }
114 
115   // Empty send buffer
116 
117   m_sendBuffer.m_dataSize = 0;
118 
119   m_initLocal=false;
120   m_mapped = false;
121 
122   if(m_sciinit) {
123     for(Uint32 i=0; i<m_adapters ; i++) {
124       SCIClose(sciAdapters[i].scidesc, FLAGS, &err);
125 
126       if(err != SCI_ERR_OK)  {
127 	report_error(TE_SCI_UNABLE_TO_CLOSE_CHANNEL);
128         DBUG_PRINT("error",
129         ("Cannot close channel to the driver. Error code 0x%x",
130 		    err));
131       }
132     }
133   }
134   m_sciinit=false;
135 
136 #ifdef DEBUG_TRANSPORTER
137       ndbout << "total: " <<  i1024+ i10242048 + i2048+i2049 << endl;
138       ndbout << "<1024: " << i1024 << endl;
139       ndbout << "1024-2047: " << i10242048 << endl;
140       ndbout << "==2048: " << i2048 << endl;
141       ndbout << "2049-4096: " << i20484096 << endl;
142       ndbout << "==4096: " << i4096 << endl;
143       ndbout << ">4096: " << i4097 << endl;
144 #endif
145   DBUG_VOID_RETURN;
146 }
147 
148 
initTransporter()149 bool SCI_Transporter::initTransporter() {
150   DBUG_ENTER("SCI_Transporter::initTransporter");
151   if(m_BufferSize < (2*MAX_MESSAGE_SIZE + 4096)){
152     m_BufferSize = 2 * MAX_MESSAGE_SIZE + 4096;
153   }
154 
155   // Allocate buffers for sending, send buffer size plus 2048 bytes for avoiding
156   // the need to send twice when a large message comes around. Send buffer size is
157   // measured in words.
158   Uint32 sz = 4 * m_PacketSize + MAX_MESSAGE_SIZE;;
159 
160   m_sendBuffer.m_sendBufferSize = 4 * ((sz + 3) / 4);
161   m_sendBuffer.m_buffer = new Uint32[m_sendBuffer.m_sendBufferSize / 4];
162   m_sendBuffer.m_dataSize = 0;
163 
164   DBUG_PRINT("info",
165   ("Created SCI Send Buffer with buffer size %d and packet size %d",
166               m_sendBuffer.m_sendBufferSize, m_PacketSize * 4));
167   if(!getLinkStatus(m_ActiveAdapterId) ||
168      (m_adapters > 1 &&
169      !getLinkStatus(m_StandbyAdapterId))) {
170     DBUG_PRINT("error",
171     ("The link is not fully operational. Check the cables and the switches"));
172     //NDB should terminate
173     report_error(TE_SCI_LINK_ERROR);
174     DBUG_RETURN(false);
175   }
176   DBUG_RETURN(true);
177 } // initTransporter()
178 
179 
180 
getLocalNodeId(Uint32 adapterNo)181 Uint32 SCI_Transporter::getLocalNodeId(Uint32 adapterNo)
182 {
183   sci_query_adapter_t queryAdapter;
184   sci_error_t  error;
185   Uint32 _localNodeId;
186 
187   queryAdapter.subcommand = SCI_Q_ADAPTER_NODEID;
188   queryAdapter.localAdapterNo = adapterNo;
189   queryAdapter.data = &_localNodeId;
190 
191   SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);
192 
193   if(error != SCI_ERR_OK)
194     return 0;
195   return _localNodeId;
196 }
197 
198 
getLinkStatus(Uint32 adapterNo)199 bool SCI_Transporter::getLinkStatus(Uint32 adapterNo)
200 {
201   sci_query_adapter_t queryAdapter;
202   sci_error_t  error;
203   int linkstatus;
204   queryAdapter.subcommand = SCI_Q_ADAPTER_LINK_OPERATIONAL;
205 
206   queryAdapter.localAdapterNo = adapterNo;
207   queryAdapter.data = &linkstatus;
208 
209   SCIQuery(SCI_Q_ADAPTER,(void*)(&queryAdapter),(Uint32)NULL,&error);
210 
211   if(error != SCI_ERR_OK) {
212     DBUG_PRINT("error", ("error %d querying adapter", error));
213     return false;
214   }
215   if(linkstatus<=0)
216     return false;
217   return true;
218 }
219 
220 
221 
initLocalSegment()222 sci_error_t SCI_Transporter::initLocalSegment() {
223   DBUG_ENTER("SCI_Transporter::initLocalSegment");
224   Uint32 segmentSize = m_BufferSize;
225   Uint32 offset  = 0;
226   sci_error_t err;
227   if(!m_sciinit) {
228     for(Uint32 i=0; i<m_adapters ; i++) {
229       SCIOpen(&(sciAdapters[i].scidesc), FLAGS, &err);
230       sciAdapters[i].localSciNodeId=getLocalNodeId(i);
231       DBUG_PRINT("info", ("SCInode iD %d  adapter %d\n",
232 	         sciAdapters[i].localSciNodeId, i));
233       if(err != SCI_ERR_OK) {
234         DBUG_PRINT("error",
235         ("Cannot open an SCI virtual device. Error code 0x%x",
236 		   err));
237 	DBUG_RETURN(err);
238       }
239     }
240   }
241 
242   m_sciinit=true;
243 
244   SCICreateSegment(sciAdapters[0].scidesc,
245 		   &(m_SourceSegm[0].localHandle),
246 		   hostSegmentId(localNodeId, remoteNodeId),
247 		   segmentSize,
248 		   0,
249 		   0,
250 		   0,
251 		   &err);
252 
253   if(err != SCI_ERR_OK) {
254     DBUG_PRINT("error", ("Error creating segment, err = 0x%x", err));
255     DBUG_RETURN(err);
256   } else {
257     DBUG_PRINT("info", ("created segment id : %d",
258 	       hostSegmentId(localNodeId, remoteNodeId)));
259   }
260 
261   /** Prepare the segment*/
262   for(Uint32 i=0; i < m_adapters; i++) {
263     SCIPrepareSegment((m_SourceSegm[0].localHandle),
264 		      i,
265 		      FLAGS,
266 		      &err);
267 
268     if(err != SCI_ERR_OK) {
269       DBUG_PRINT("error",
270     ("Local Segment is not accessible by an SCI adapter. Error code 0x%x\n",
271                   err));
272       DBUG_RETURN(err);
273     }
274   }
275 
276 
277   m_SourceSegm[0].mappedMemory =
278     SCIMapLocalSegment((m_SourceSegm[0].localHandle),
279 		       &(m_SourceSegm[0].lhm[0].map),
280 		       offset,
281 		       segmentSize,
282 		       NULL,
283 		       FLAGS,
284 		       &err);
285 
286 
287 
288   if(err != SCI_ERR_OK) {
289     DBUG_PRINT("error", ("Cannot map area of size %d. Error code 0x%x",
290 	        segmentSize,err));
291     doDisconnect();
292     DBUG_RETURN(err);
293   }
294 
295 
296   /** Make the local segment available*/
297   for(Uint32 i=0; i < m_adapters; i++) {
298     SCISetSegmentAvailable((m_SourceSegm[0].localHandle),
299 			     i,
300 			   FLAGS,
301 			   &err);
302 
303     if(err != SCI_ERR_OK) {
304       DBUG_PRINT("error",
305    ("Local Segment is not available for remote connections. Error code 0x%x\n",
306                  err));
307       DBUG_RETURN(err);
308     }
309   }
310   setupLocalSegment();
311   DBUG_RETURN(err);
312 
313 } // initLocalSegment()
314 
315 
doSend()316 bool SCI_Transporter::doSend() {
317 #ifdef DEBUG_TRANSPORTER
318   NDB_TICKS startSec=0, stopSec=0;
319   Uint32 startMicro=0, stopMicro=0, totalMicro=0;
320 #endif
321   sci_error_t             err;
322   Uint32 retry=0;
323 
324   const char * const sendPtr = (char*)m_sendBuffer.m_buffer;
325   const Uint32 sizeToSend    = 4 * m_sendBuffer.m_dataSize; //Convert to number of bytes
326 
327   if (sizeToSend > 0){
328 #ifdef DEBUG_TRANSPORTER
329     if(sizeToSend < 1024 )
330       i1024++;
331     if(sizeToSend > 1024 && sizeToSend < 2048 )
332       i10242048++;
333     if(sizeToSend==2048)
334       i2048++;
335     if(sizeToSend>2048 && sizeToSend < 4096)
336       i20484096++;
337     if(sizeToSend==4096)
338       i4096++;
339     if(sizeToSend==4097)
340       i4097++;
341 #endif
342 
343   tryagain:
344     retry++;
345     if (retry > 3) {
346       DBUG_PRINT("error", ("SCI Transfer failed"));
347       report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
348       return false;
349     }
350     Uint32 * insertPtr = (Uint32 *)
351       (m_TargetSegm[m_ActiveAdapterId].writer)->getWritePtr(sizeToSend);
352 
353     if(insertPtr != 0) {
354 
355       const Uint32 remoteOffset=(Uint32)
356 	((char*)insertPtr -
357 	 (char*)(m_TargetSegm[m_ActiveAdapterId].mappedMemory));
358 
359       SCIMemCpy(m_TargetSegm[m_ActiveAdapterId].sequence,
360 		(void*)sendPtr,
361 		m_TargetSegm[m_ActiveAdapterId].rhm[m_ActiveAdapterId].map,
362 		remoteOffset,
363 		sizeToSend,
364 		SCI_FLAG_ERROR_CHECK,
365 		&err);
366 
367       if (err != SCI_ERR_OK) {
368         if (err == SCI_ERR_OUT_OF_RANGE ||
369             err == SCI_ERR_SIZE_ALIGNMENT ||
370             err == SCI_ERR_OFFSET_ALIGNMENT) {
371           DBUG_PRINT("error", ("Data transfer error = %d", err));
372           report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
373 	  return false;
374         }
375         if(err == SCI_ERR_TRANSFER_FAILED) {
376 	  if(getLinkStatus(m_ActiveAdapterId))
377 	    goto tryagain;
378           if (m_adapters == 1) {
379             DBUG_PRINT("error", ("SCI Transfer failed"));
380             report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
381 	    return false;
382           }
383 	  m_failCounter++;
384 	  Uint32 temp=m_ActiveAdapterId;
385 	  if (getLinkStatus(m_StandbyAdapterId)) {
386 	    failoverShmWriter();
387 	    SCIStoreBarrier(m_TargetSegm[m_StandbyAdapterId].sequence,0);
388 	    m_ActiveAdapterId=m_StandbyAdapterId;
389 	    m_StandbyAdapterId=temp;
390             DBUG_PRINT("error", ("Swapping from adapter %u to %u",
391                        m_StandbyAdapterId, m_ActiveAdapterId));
392 	  } else {
393 	    report_error(TE_SCI_UNRECOVERABLE_DATA_TFX_ERROR);
394             DBUG_PRINT("error", ("SCI Transfer failed"));
395 	  }
396         }
397       } else {
398 	SHM_Writer * writer = (m_TargetSegm[m_ActiveAdapterId].writer);
399 	writer->updateWritePtr(sizeToSend);
400 
401 	Uint32 sendLimit = writer->getBufferSize();
402 	sendLimit -= writer->getWriteIndex();
403 
404 	m_sendBuffer.m_dataSize = 0;
405 	m_sendBuffer.m_forceSendLimit = sendLimit;
406       }
407     } else {
408       /**
409        * If we end up here, the SCI segment is full.
410        */
411       DBUG_PRINT("error", ("the segment is full for some reason"));
412       return false;
413     } //if
414   }
415   return true;
416 } // doSend()
417 
418 
419 
failoverShmWriter()420 void SCI_Transporter::failoverShmWriter() {
421 #if 0
422   (m_TargetSegm[m_StandbyAdapterId].writer)
423     ->copyIndexes((m_TargetSegm[m_StandbyAdapterId].writer));
424 #endif
425 } //failoverShm
426 
427 
setupLocalSegment()428 void SCI_Transporter::setupLocalSegment()
429 {
430    DBUG_ENTER("SCI_Transporter::setupLocalSegment");
431    Uint32 sharedSize = 0;
432    sharedSize =4096;   //start of the buffer is page aligend
433 
434    Uint32 sizeOfBuffer = m_BufferSize;
435 
436    sizeOfBuffer -= sharedSize;
437 
438    Uint32 * localReadIndex =
439      (Uint32*)m_SourceSegm[m_ActiveAdapterId].mappedMemory;
440    Uint32 * localWriteIndex =  (Uint32*)(localReadIndex+ 1);
441    m_localStatusFlag = (Uint32*)(localReadIndex + 3);
442 
443    char * localStartOfBuf = (char*)
444      ((char*)m_SourceSegm[m_ActiveAdapterId].mappedMemory+sharedSize);
445 
446    * localReadIndex = 0;
447    * localWriteIndex = 0;
448 
449    const Uint32 slack = MAX_MESSAGE_SIZE;
450 
451    reader = new SHM_Reader(localStartOfBuf,
452 			   sizeOfBuffer,
453 			   slack,
454 			   localReadIndex,
455 			   localWriteIndex);
456 
457    reader->clear();
458    DBUG_VOID_RETURN;
459 } //setupLocalSegment
460 
setupRemoteSegment()461 void SCI_Transporter::setupRemoteSegment()
462 {
463    DBUG_ENTER("SCI_Transporter::setupRemoteSegment");
464    Uint32 sharedSize = 0;
465    sharedSize =4096;   //start of the buffer is page aligned
466 
467    Uint32 sizeOfBuffer = m_BufferSize;
468    const Uint32 slack = MAX_MESSAGE_SIZE;
469    sizeOfBuffer -= sharedSize;
470 
471    Uint32 *segPtr = (Uint32*) m_TargetSegm[m_ActiveAdapterId].mappedMemory ;
472 
473    Uint32 * remoteReadIndex = (Uint32*)segPtr;
474    Uint32 * remoteWriteIndex = (Uint32*)(segPtr + 1);
475    m_remoteStatusFlag = (Uint32*)(segPtr + 3);
476 
477    char * remoteStartOfBuf = ( char*)((char*)segPtr+(sharedSize));
478 
479    writer = new SHM_Writer(remoteStartOfBuf,
480 			   sizeOfBuffer,
481 			   slack,
482 			   remoteReadIndex,
483 			   remoteWriteIndex);
484 
485    writer->clear();
486 
487    m_TargetSegm[0].writer=writer;
488 
489    m_sendBuffer.m_forceSendLimit = writer->getBufferSize();
490 
491    if(createSequence(m_ActiveAdapterId)!=SCI_ERR_OK) {
492      report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
493      DBUG_PRINT("error", ("Unable to create sequence on active"));
494      doDisconnect();
495    }
496    if (m_adapters > 1) {
497      segPtr = (Uint32*) m_TargetSegm[m_StandbyAdapterId].mappedMemory ;
498 
499      Uint32 * remoteReadIndex2 = (Uint32*)segPtr;
500      Uint32 * remoteWriteIndex2 = (Uint32*) (segPtr + 1);
501      m_remoteStatusFlag2 = (Uint32*)(segPtr + 3);
502 
503      char * remoteStartOfBuf2 = ( char*)((char *)segPtr+sharedSize);
504 
505      /**
506       * setup a writer. writer2 is used to mirror the changes of
507       * writer on the standby
508       * segment, so that in the case of a failover, we can switch
509       * to the stdby seg. quickly.*
510       */
511      writer2 = new SHM_Writer(remoteStartOfBuf2,
512                               sizeOfBuffer,
513                               slack,
514                               remoteReadIndex2,
515                               remoteWriteIndex2);
516 
517      * remoteReadIndex = 0;
518      * remoteWriteIndex = 0;
519      writer2->clear();
520      m_TargetSegm[1].writer=writer2;
521      if(createSequence(m_StandbyAdapterId)!=SCI_ERR_OK) {
522        report_error(TE_SCI_UNABLE_TO_CREATE_SEQUENCE);
523        DBUG_PRINT("error", ("Unable to create sequence on standby"));
524        doDisconnect();
525      }
526    }
527    DBUG_VOID_RETURN;
528 } //setupRemoteSegment
529 
530 bool
init_local()531 SCI_Transporter::init_local()
532 {
533   DBUG_ENTER("SCI_Transporter::init_local");
534   if(!m_initLocal) {
535     if(initLocalSegment()!=SCI_ERR_OK){
536       NdbSleep_MilliSleep(10);
537       //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
538       report_error(TE_SCI_CANNOT_INIT_LOCALSEGMENT);
539       DBUG_RETURN(false);
540     }
541     m_initLocal=true;
542   }
543   DBUG_RETURN(true);
544 }
545 
546 bool
init_remote()547 SCI_Transporter::init_remote()
548 {
549   DBUG_ENTER("SCI_Transporter::init_remote");
550   sci_error_t err;
551   Uint32 offset = 0;
552   if(!m_mapped ) {
553     DBUG_PRINT("info", ("Map remote segments"));
554     for(Uint32 i=0; i < m_adapters ; i++) {
555       m_TargetSegm[i].rhm[i].remoteHandle=0;
556       SCIConnectSegment(sciAdapters[i].scidesc,
557                         &(m_TargetSegm[i].rhm[i].remoteHandle),
558                         m_remoteNodes[i],
559                         remoteSegmentId(localNodeId, remoteNodeId),
560                         i,
561                         0,
562                         0,
563                         0,
564                         0,
565                         &err);
566 
567       if(err != SCI_ERR_OK) {
568         NdbSleep_MilliSleep(10);
569         DBUG_PRINT("error", ("Error connecting segment, err 0x%x", err));
570         DBUG_RETURN(false);
571       }
572     }
573     // Map the remote memory segment into program space
574     for(Uint32 i=0; i < m_adapters ; i++) {
575       m_TargetSegm[i].mappedMemory =
576         SCIMapRemoteSegment((m_TargetSegm[i].rhm[i].remoteHandle),
577                             &(m_TargetSegm[i].rhm[i].map),
578                             offset,
579                             m_BufferSize,
580                             NULL,
581                             FLAGS,
582                             &err);
583 
584       if(err!= SCI_ERR_OK) {
585         DBUG_PRINT("error",
586           ("Cannot map a segment to the remote node %d. Error code 0x%x",
587           m_RemoteSciNodeId, err));
588         //NDB SHOULD TERMINATE AND COMPUTER REBOOTED!
589         report_error(TE_SCI_CANNOT_MAP_REMOTESEGMENT);
590         DBUG_RETURN(false);
591       }
592     }
593     m_mapped=true;
594     setupRemoteSegment();
595     setConnected();
596     DBUG_PRINT("info", ("connected and mapped to segment, remoteNode: %d",
597                remoteNodeId));
598     DBUG_PRINT("info", ("remoteSegId: %d",
599                remoteSegmentId(localNodeId, remoteNodeId)));
600     DBUG_RETURN(true);
601   } else {
602     DBUG_RETURN(getConnectionStatus());
603   }
604 }
605 
606 bool
connect_client_impl(NDB_SOCKET_TYPE sockfd)607 SCI_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
608 {
609   SocketInputStream s_input(sockfd);
610   SocketOutputStream s_output(sockfd);
611   char buf[256];
612   DBUG_ENTER("SCI_Transporter::connect_client_impl");
613   // Wait for server to create and attach
614   if (s_input.gets(buf, 256) == 0) {
615     DBUG_PRINT("error", ("No initial response from server in SCI"));
616     NDB_CLOSE_SOCKET(sockfd);
617     DBUG_RETURN(false);
618   }
619   if (!init_local()) {
620     NDB_CLOSE_SOCKET(sockfd);
621     DBUG_RETURN(false);
622   }
623 
624   // Send ok to server
625   s_output.println("sci client 1 ok");
626 
627   if (!init_remote()) {
628     NDB_CLOSE_SOCKET(sockfd);
629     DBUG_RETURN(false);
630   }
631   // Wait for ok from server
632   if (s_input.gets(buf, 256) == 0) {
633     DBUG_PRINT("error", ("No second response from server in SCI"));
634     NDB_CLOSE_SOCKET(sockfd);
635     DBUG_RETURN(false);
636   }
637   // Send ok to server
638   s_output.println("sci client 2 ok");
639 
640   NDB_CLOSE_SOCKET(sockfd);
641   DBUG_PRINT("info", ("Successfully connected client to node %d",
642               remoteNodeId));
643   DBUG_RETURN(true);
644 }
645 
646 bool
connect_server_impl(NDB_SOCKET_TYPE sockfd)647 SCI_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
648 {
649   SocketOutputStream s_output(sockfd);
650   SocketInputStream s_input(sockfd);
651   char buf[256];
652   DBUG_ENTER("SCI_Transporter::connect_server_impl");
653 
654   if (!init_local()) {
655     NDB_CLOSE_SOCKET(sockfd);
656     DBUG_RETURN(false);
657   }
658   // Send ok to client
659   s_output.println("sci server 1 ok");
660 
661   // Wait for ok from client
662   if (s_input.gets(buf, 256) == 0) {
663     DBUG_PRINT("error", ("No response from client in SCI"));
664     NDB_CLOSE_SOCKET(sockfd);
665     DBUG_RETURN(false);
666   }
667 
668   if (!init_remote()) {
669     NDB_CLOSE_SOCKET(sockfd);
670     DBUG_RETURN(false);
671   }
672   // Send ok to client
673   s_output.println("sci server 2 ok");
674   // Wait for ok from client
675   if (s_input.gets(buf, 256) == 0) {
676     DBUG_PRINT("error", ("No second response from client in SCI"));
677     NDB_CLOSE_SOCKET(sockfd);
678     DBUG_RETURN(false);
679   }
680 
681   NDB_CLOSE_SOCKET(sockfd);
682   DBUG_PRINT("info", ("Successfully connected server to node %d",
683               remoteNodeId));
684   DBUG_RETURN(true);
685 }
686 
createSequence(Uint32 adapterid)687 sci_error_t SCI_Transporter::createSequence(Uint32 adapterid) {
688   sci_error_t err;
689   SCICreateMapSequence((m_TargetSegm[adapterid].rhm[adapterid].map),
690 		       &(m_TargetSegm[adapterid].sequence),
691 		       SCI_FLAG_FAST_BARRIER,
692 		       &err);
693   return err;
694 } // createSequence()
695 
disconnectLocal()696 bool SCI_Transporter::disconnectLocal()
697 {
698   DBUG_ENTER("SCI_Transporter::disconnectLocal");
699   sci_error_t err;
700   m_ActiveAdapterId=0;
701 
702   /** Free resources used by a local segment
703    */
704 
705   SCIUnmapSegment(m_SourceSegm[0].lhm[0].map,0,&err);
706   if(err!=SCI_ERR_OK) {
707     report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
708     DBUG_PRINT("error", ("Unable to unmap segment"));
709     DBUG_RETURN(false);
710   }
711 
712   SCIRemoveSegment((m_SourceSegm[m_ActiveAdapterId].localHandle),
713 		   FLAGS,
714 		   &err);
715 
716   if(err!=SCI_ERR_OK) {
717     report_error(TE_SCI_UNABLE_TO_REMOVE_SEGMENT);
718     DBUG_PRINT("error", ("Unable to remove segment"));
719     DBUG_RETURN(false);
720   }
721   DBUG_PRINT("info", ("Local memory segment is unmapped and removed"));
722   DBUG_RETURN(true);
723 } // disconnectLocal()
724 
725 
disconnectRemote()726 bool SCI_Transporter::disconnectRemote()  {
727   DBUG_ENTER("SCI_Transporter::disconnectRemote");
728   sci_error_t err;
729   for(Uint32 i=0; i<m_adapters; i++) {
730     /**
731      * Segment unmapped, disconnect from the remotely connected segment
732      */
733     SCIUnmapSegment(m_TargetSegm[i].rhm[i].map,0,&err);
734     if(err!=SCI_ERR_OK) {
735       report_error(TE_SCI_UNABLE_TO_UNMAP_SEGMENT);
736       DBUG_PRINT("error", ("Unable to unmap segment"));
737       DBUG_RETURN(false);
738     }
739 
740     SCIDisconnectSegment(m_TargetSegm[i].rhm[i].remoteHandle,
741 			 FLAGS,
742 			 &err);
743     if(err!=SCI_ERR_OK) {
744       report_error(TE_SCI_UNABLE_TO_DISCONNECT_SEGMENT);
745       DBUG_PRINT("error", ("Unable to disconnect segment"));
746       DBUG_RETURN(false);
747     }
748     DBUG_PRINT("info", ("Remote memory segment is unmapped and disconnected"));
749   }
750   DBUG_RETURN(true);
751 } // disconnectRemote()
752 
753 
~SCI_Transporter()754 SCI_Transporter::~SCI_Transporter() {
755   DBUG_ENTER("SCI_Transporter::~SCI_Transporter");
756   // Close channel to the driver
757   doDisconnect();
758   if(m_sendBuffer.m_buffer != NULL)
759     delete[] m_sendBuffer.m_buffer;
760   DBUG_VOID_RETURN;
761 } // ~SCI_Transporter()
762 
closeSCI()763 void SCI_Transporter::closeSCI() {
764   // Termination of SCI
765   sci_error_t err;
766   DBUG_ENTER("SCI_Transporter::closeSCI");
767 
768   // Disconnect and remove remote segment
769   disconnectRemote();
770 
771   // Unmap and remove local segment
772 
773   disconnectLocal();
774 
775   // Closes an SCI virtual device
776   SCIClose(activeSCIDescriptor, FLAGS, &err);
777 
778   if(err != SCI_ERR_OK) {
779     DBUG_PRINT("error",
780       ("Cannot close SCI channel to the driver. Error code 0x%x",
781       err));
782   }
783   SCITerminate();
784   DBUG_VOID_RETURN;
785 } // closeSCI()
786 
787 Uint32 *
getWritePtr(Uint32 lenBytes,Uint32 prio)788 SCI_Transporter::getWritePtr(Uint32 lenBytes, Uint32 prio)
789 {
790 
791   Uint32 sci_buffer_remaining = m_sendBuffer.m_forceSendLimit;
792   Uint32 send_buf_size = m_sendBuffer.m_sendBufferSize;
793   Uint32 curr_data_size = m_sendBuffer.m_dataSize << 2;
794   Uint32 new_curr_data_size = curr_data_size + lenBytes;
795   if ((curr_data_size >= send_buf_size) ||
796       (curr_data_size >= sci_buffer_remaining)) {
797     /**
798      * The new message will not fit in the send buffer. We need to
799      * send the send buffer before filling it up with the new
800      * signal data. If current data size will spill over buffer edge
801      * we will also send to ensure correct operation.
802      */
803     if (!doSend()) {
804       /**
805        * We were not successfull sending, report 0 as meaning buffer full and
806        * upper levels handle retries and other recovery matters.
807        */
808       return 0;
809     }
810   }
811   /**
812    * New signal fits, simply fill it up with more data.
813    */
814   Uint32 sz = m_sendBuffer.m_dataSize;
815   return &m_sendBuffer.m_buffer[sz];
816 }
817 
818 void
updateWritePtr(Uint32 lenBytes,Uint32 prio)819 SCI_Transporter::updateWritePtr(Uint32 lenBytes, Uint32 prio){
820 
821   Uint32 sz = m_sendBuffer.m_dataSize;
822   Uint32 packet_size = m_PacketSize;
823   sz += ((lenBytes + 3) >> 2);
824   m_sendBuffer.m_dataSize = sz;
825 
826   if(sz > packet_size) {
827     /**-------------------------------------------------
828      * Buffer is full and we are ready to send. We will
829      * not wait since the signal is already in the buffer.
830      * Force flag set has the same indication that we
831      * should always send. If it is not possible to send
832      * we will not worry since we will soon be back for
833      * a renewed trial.
834      *-------------------------------------------------
835      */
836     doSend();
837   }
838 }
839 
840 enum SciStatus {
841   SCIDISCONNECT = 1,
842   SCICONNECTED  = 2
843 };
844 
845 bool
getConnectionStatus()846 SCI_Transporter::getConnectionStatus() {
847   if(*m_localStatusFlag == SCICONNECTED &&
848      (*m_remoteStatusFlag == SCICONNECTED ||
849      ((m_adapters > 1) &&
850       *m_remoteStatusFlag2 == SCICONNECTED)))
851     return true;
852   else
853     return false;
854 }
855 
856 void
setConnected()857 SCI_Transporter::setConnected() {
858   *m_remoteStatusFlag = SCICONNECTED;
859   if (m_adapters > 1) {
860     *m_remoteStatusFlag2 = SCICONNECTED;
861   }
862   *m_localStatusFlag = SCICONNECTED;
863 }
864 
865 void
setDisconnect()866 SCI_Transporter::setDisconnect() {
867   if(getLinkStatus(m_ActiveAdapterId))
868     *m_remoteStatusFlag = SCIDISCONNECT;
869   if (m_adapters > 1) {
870     if(getLinkStatus(m_StandbyAdapterId))
871       *m_remoteStatusFlag2 = SCIDISCONNECT;
872   }
873 }
874 
875 bool
checkConnected()876 SCI_Transporter::checkConnected() {
877   if (*m_localStatusFlag == SCIDISCONNECT) {
878     return false;
879   }
880   else
881     return true;
882 }
883 
884 static bool init = false;
885 
886 bool
initSCI()887 SCI_Transporter::initSCI() {
888   DBUG_ENTER("SCI_Transporter::initSCI");
889   if(!init){
890     sci_error_t error;
891     // Initialize SISCI library
892     SCIInitialize(0, &error);
893     if(error != SCI_ERR_OK)  {
894       DBUG_PRINT("error", ("Cannot initialize SISCI library."));
895       DBUG_PRINT("error",
896       ("Inconsistency between SISCI library and SISCI driver. Error code 0x%x",
897       error));
898       DBUG_RETURN(false);
899     }
900     init = true;
901   }
902   DBUG_RETURN(true);
903 }
904 
905 Uint32
get_free_buffer() const906 SCI_Transporter::get_free_buffer() const
907 {
908   return (m_TargetSegm[m_ActiveAdapterId].writer)->get_free_buffer();
909 }
910 
911