1 
2 #include "normSocket.h"
3 #include <stdio.h>  // for stderr
4 #include <assert.h> // for assert()
5 #include <string.h>  // for strlen()
6 #include <arpa/inet.h>  // for inet_ntoa() (TBD - change to use Protolib routines?)
7 
8 // COMPILE: (assumes "normApi.h" in "include" ...
9 // g++ -I../include -c normSocket.cpp
10 
11 
12 // This "NormSocket" class is used to maintain tx/rx state for a NORM "socket" connection.
13 // At the moment this "socket" connection represents a single, bi-directional NORM_OBJECT_STREAM
14 // in either a unicast context or an asymmetric "server" multicast stream to possibly multiple "client"
15 // nodes with individual unicast streams in return from those "client" nodes. (I.e., the server will need to
16 // have a normSocket per client even for the server multicast case (maybe :-) )
17 
18 const NormSocketHandle NORM_SOCKET_INVALID = (NormSocketHandle)0;
19 
20 const double NORM_DEFAULT_CONNECT_TIMEOUT = 60.0;
21 
22 // a 'helper' function we use for debugging
NormNodeGetAddressString(NormNodeHandle node)23 const char* NormNodeGetAddressString(NormNodeHandle node)
24 {
25     char addr[16];  // big enough for IPv6
26     unsigned int addrLen = 16;
27     UINT16 port;
28     if (NormNodeGetAddress(node, addr, &addrLen, &port))
29     {
30         static char text[64];
31         text[0] = text[31] = '\0';
32         int addrFamily;
33         if (4 == addrLen)
34             addrFamily = AF_INET;
35         else
36             addrFamily = AF_INET6;
37         inet_ntop(addrFamily, addr, text, 31);
38         sprintf(text + strlen(text), "/%hu", port);
39         return text;
40     }
41     else
42     {
43         return "???";
44     }
45 }  // end NormNodeGetAddressString()
46 
47 class NormSocket
48 {
49     public:
50         NormSocket(NormSessionHandle normSession = NORM_SESSION_INVALID);
51 
52         // These methods identify the role of this socket with respect
53         // to the client / server relationship (a "server socket" is
54         // one for which NormListen() has been invoked).
IsServerSocket() const55         bool IsServerSocket() const
56             {return (server_socket == this);}
IsClientSocket() const57         bool IsClientSocket() const
58             {return (server_socket != this);}
IsUnicastSocket() const59         bool IsUnicastSocket() const
60             {return (NORM_SESSION_INVALID == mcast_session);}
IsMulticastSocket() const61         bool IsMulticastSocket() const
62             {return !IsUnicastSocket();}
IsMulticastServer() const63         bool IsMulticastServer() const
64             {return (IsMulticastSocket() && IsServerSocket());}
IsMulticastClient() const65         bool IsMulticastClient() const
66             {return (IsMulticastSocket() && IsClientSocket());}
IsServerSide() const67         bool IsServerSide() const
68             {return (NULL != server_socket);}
IsClientSide() const69         bool IsClientSide() const
70             {return (NULL == server_socket);}
71 
GetSession() const72         NormSessionHandle GetSession() const
73             {return norm_session;}
GetMulticastSession() const74         NormSessionHandle GetMulticastSession() const
75             {return mcast_session;}
76 
InitRxStream(NormObjectHandle rxStream)77         void InitRxStream(NormObjectHandle rxStream)
78             {rx_stream = rxStream;}
GetRxStream() const79         NormObjectHandle GetRxStream() const
80             {return rx_stream;}
81 
InitTxStream(NormObjectHandle txStream,unsigned int bufferSize,UINT16 segmentSize,UINT16 blockSize)82         void InitTxStream(NormObjectHandle txStream, unsigned int bufferSize, UINT16 segmentSize, UINT16 blockSize)
83         {
84             tx_stream = txStream;
85             tx_segment_size = segmentSize;
86             tx_stream_buffer_max = NormGetStreamBufferSegmentCount(bufferSize, segmentSize, blockSize);
87             tx_stream_buffer_max -= blockSize;  // a little safety margin (perhaps not necessary)
88             tx_stream_buffer_count = 0;
89             tx_stream_bytes_remain = 0;
90             tx_watermark_pending = false;
91         }
92 
93 
94         bool Listen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr);
95         NormSocket* Accept(NormNodeHandle client, NormInstanceHandle instance = NORM_INSTANCE_INVALID);
96         bool Connect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId);
97 
98 
99         // Write to tx stream (with flow control)
100         unsigned int Write(const char* buffer, unsigned int numBytes);
101         void Flush(bool eom = false, NormFlushMode flushMode = NORM_FLUSH_ACTIVE);
102         // Read from rx_stream
103         bool Read(char* buffer, unsigned int& numBytes);
104 
105         // "graceful" shutdown (stream is flushed and stream end, etc)
106         void Shutdown();
107 
108         // hard, immediate closure
109         void Close();
110 
111         void GetSocketEvent(const NormEvent& event, NormSocketEvent& socketEvent);
112 
113         typedef enum State
114         {
115             CLOSED,
116             LISTENING,
117             CONNECTING,
118             ACCEPTING,
119             CONNECTED,
120             CLOSING
121         } State;
122 
AddAckingNode(NormNodeId nodeId)123         bool AddAckingNode(NormNodeId nodeId)
124         {
125             if (NormAddAckingNode(norm_session, nodeId))
126             {
127                 client_count++;
128                 return true;
129             }
130             else
131             {
132                 return false;
133             }
134         }
RemoveAckingNode(NormNodeId nodeId)135         void RemoveAckingNode(NormNodeId nodeId)
136             {NormRemoveAckingNode(norm_session, nodeId);}
137 
138 
139     private:
140         State               socket_state;
141         NormSessionHandle   norm_session;
142         NormSessionHandle   mcast_session;   // equals norm_session for a multicast server
143         NormSocket*         server_socket;   // only applies to server-side sockets
144         unsigned int        client_count;    // only applies to mcast server sockets
145         NormNodeId          client_id;       // only applies to mcast client socket
146         NormNodeHandle      remote_node;     //
147         // Send stream and associated flow control state variables
148         NormObjectHandle    tx_stream;
149         bool                tx_ready;
150         UINT16              tx_segment_size;
151         unsigned int        tx_stream_buffer_max;
152         unsigned int        tx_stream_buffer_count;
153         unsigned int        tx_stream_bytes_remain;
154         bool                tx_watermark_pending;
155         // Receive stream state
156         NormObjectHandle    rx_stream;
157 
158 };  // end class NormSocket
159 
160 
NormSocket(NormSessionHandle normSession)161 NormSocket::NormSocket(NormSessionHandle normSession)
162  : socket_state(CLOSED), norm_session(normSession),
163    mcast_session(NORM_SESSION_INVALID), server_socket(NULL),
164    client_count(0), client_id(NORM_NODE_NONE), remote_node(NORM_NODE_INVALID),
165    tx_stream(NORM_OBJECT_INVALID), tx_ready(false), tx_segment_size(0),
166    tx_stream_buffer_max(0), tx_stream_buffer_count(0),
167    tx_stream_bytes_remain(0), tx_watermark_pending(false),
168    rx_stream(NORM_OBJECT_INVALID)
169 {
170     // For now we use the NormSession "user data" option to associate
171     // the session with a "socket".  In the future we may add a
172     // dedicated NormSetSocket(NormSessionHandle session, NormSocketHandle normSocket) API
173     // to keep the "user data" feature available for other purposes
174     if (NORM_SESSION_INVALID != normSession) // this should always be true
175         NormSetUserData(normSession, this);
176 }
177 
178 
Listen(NormInstanceHandle instance,UINT16 serverPort,const char * groupAddr)179 bool NormSocket::Listen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr)
180 {
181     if (CLOSED != socket_state)
182     {
183         fprintf(stderr, "NormSocket::Listen() error: socket already open?!\n");
184         return false;
185     }
186 
187     if (NULL != groupAddr)
188     {
189         // TBD - validate that "groupAddr" is indeed a multicast address
190         norm_session = NormCreateSession(instance, groupAddr, serverPort, NORM_NODE_ANY);
191         NormSetTxPort(norm_session, serverPort); // can't do this and receive unicast feedback
192         mcast_session = norm_session;
193     }
194     else
195     {
196         // For unicast , the "server" has a NormNodeId of '1' and the "clients" are '2'
197         // to obviate need for explicit id management and will allow NAT to work, etc
198         norm_session = NormCreateSession(instance, "127.0.0.1", serverPort, 1);
199     }
200     if (NORM_SESSION_INVALID == norm_session)
201     {
202         fprintf(stderr, "NormSocket::Listen() error: NormCreateSession() failure\n");
203         return false;
204     }
205     NormSetUserData(norm_session, this);
206     // Note the port reuse here lets us manage our "client" rx-only unicast connections the
207     // way we need, but does allow a second multicast server to be started on this group which leads
208     // to undefined behavior.  TBD - see if we can prevent via binding wizardry
209     // (How is it done for TCP servers? - probably because the accept() call is in the network stack
210     //  instead of user-space) Perhaps we could have a semaphore lock to block a second "server"
211     NormSetRxPortReuse(norm_session, true);
212 
213     // use default sync policy so a "serversocket" doesn't NACK the senders it detects
214     // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
215     //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
216     //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
217 
218     if (NULL == groupAddr)
219     {
220         // Unicast server
221         // Note we use a small buffer size here since a "listening" socket isn't
222         // going to be receiving data (TBD - implement a mechanism to handoff remote
223         // sender (i.e. "client") from parent
224         if (!NormStartReceiver(norm_session, 2048))
225         {
226             fprintf(stderr, "NormSocket::Listen() error: NormStartReceiver() failure (perhaps port already in use)\n");
227             NormDestroySession(norm_session);
228             norm_session = NORM_SESSION_INVALID;
229         }
230     }
231     else
232     {
233         //NormSetMulticastInterface(norm_session, "lo0");
234         NormSetMulticastLoopback(norm_session, true);  // for testing
235         if (!NormStartReceiver(norm_session, 2048))
236         {
237             fprintf(stderr, "NormSocket::Listen() error: NormStartReceiver() failure (perhaps port already in use)\n");
238             NormDestroySession(norm_session);
239             norm_session = NORM_SESSION_INVALID;
240         }
241         // TBD - We _could_ go ahead and call NormStartSender(), but for now we'll wait until we hear the application
242         //       makes at least one NormAccept() call ...
243     }
244     server_socket  = this;
245     socket_state = LISTENING;
246     return true;
247 }  // end NormSocket::Listen()
248 
Accept(NormNodeHandle client,NormInstanceHandle instance)249 NormSocket* NormSocket::Accept(NormNodeHandle client, NormInstanceHandle instance)
250 {
251     if (!IsServerSocket()) return NULL;
252     char clientAddr[64];
253     clientAddr[63] = '\0';
254     char addr[16]; // big enough for IPv6
255     unsigned int addrLen = 16;
256     UINT16 clientPort;
257     NormNodeGetAddress(client, addr, &addrLen, &clientPort);
258     int addrFamily;
259     UINT8 version;
260     if (4 == addrLen)
261     {
262         addrFamily = AF_INET;
263         version = 4;
264     }
265     else
266     {
267         addrFamily = AF_INET6;
268         version = 6;
269     }
270     inet_ntop(addrFamily, addr, clientAddr, 63);
271 
272     UINT16 serverPort = NormGetRxPort(norm_session);
273     if (NORM_INSTANCE_INVALID == instance)
274         instance = NormGetInstance(norm_session);
275 
276     NormSessionHandle clientSession = NormCreateSession(instance, clientAddr, serverPort, 1);
277 
278     NormSetTxPort(clientSession, serverPort, false);
279 
280     // This next API call will cause NORM to tightly bind the remote client src addr/port to
281     // our server port so the "clientSession" captures the client packets instead of the "server" session
282 
283     // Any new packets will come to our new connected clientSession instead
284     // However, note that even though we've "connected" this sender,
285     // there is a chance that additional packets in the "serverSession"
286     // rx socket buffer may look like a new sender if deleted now, so
287     // we wait for NORM_REMOTE_SENDER_INACTIVE to delete
288 
289     // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
290     //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
291     NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
292 
293     NormSetRxPortReuse(clientSession, true, 0, clientAddr, clientPort);  // "connects" to remote client addr/port
294     NormSetDefaultUnicastNack(clientSession, true);
295 
296     NormStartReceiver(clientSession, 2*1024*1024);
297 
298     NormSocket* clientSocket = new NormSocket(clientSession);
299     clientSocket->server_socket = this;  // this is a server-side socket
300     clientSocket->remote_node = client;
301     NormNodeSetUserData(client, clientSocket);
302 
303     NormNodeId clientId = NormNodeGetId(client);
304 
305     if (IsUnicastSocket())
306     {
307         // The clientSession is bi-directional so we need to NormStartSender(), etc
308         NormAddAckingNode(clientSession, 2); //clientId);
309         NormSetFlowControl(clientSession, 0);  // disable timer-based flow control since we do explicit, ACK-based flow control
310         NormStartSender(clientSession, NormGetRandomSessionId(), 2*1024*1024, 1400, 16, 4);
311     }
312     else // if IsMulticastSocket()
313     {
314         // TBD - should we make sure this not a NormNodeId we already have?
315         // TBD - should we wait to add the client as acking node until CONNECT
316         //       (probably for heavyweight; for lightweight we know the client
317         //        has already started his multicast receiver)
318         AddAckingNode(clientId);  // TBD - check result
319         NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, clientId);
320         NormNodeSetUserData(node, clientSocket); // a way to track mcast client sockets
321         clientSocket->mcast_session = mcast_session;
322         clientSocket->client_id = client_id;
323         if (LISTENING == socket_state)
324         {
325             NormSetFlowControl(norm_session, 0);  // disable timer-based flow control since we do explicit, ACK-based flow control
326             NormStartSender(norm_session, NormGetRandomSessionId(), 2*1024*1024, 1400, 16, 4);
327             socket_state = CONNECTED;
328         }
329         /* The code below would be invoked for "heavyweight" mcast client admission
330           (for the moment we go with a "lightweight" model - this might be invokable upon
331            as an optional behavior later)
332 
333         // Here, we start the clientSession (w/ a minimal buffer size) and create a temporary sender
334         // stream that is immediately flushed/closed to inform the "client" that his connection
335         // has been accepted.  The sender function is terminated upon client acknowledgement
336         NormAddAckingNode(clientSession, clientId);
337         NormSetFlowControl(clientSession, 0);  // disable timer-based flow control since we do explicit, ACK-based flow control
338         NormStartSender(clientSession, NormGetRandomSessionId(), 1024, 512, 1, 0);
339         NormObjectHandle tempStream = NormStreamOpen(clientSession, 1024);
340         NormStreamClose(tempStream, true);  // Note our "trick" here to do a graceful close, _then_ watermark to get ack
341         NormSetWatermark(clientSession, tempStream, true);  // future NORM API will add "bool watermark" option to graceful close
342         */
343     }
344     clientSocket->socket_state = ACCEPTING;  // will transision to CONNECTED when client is detected on new clientSession
345     return clientSocket;
346 }  // end NormSocket::Accept()
347 
348 // TBD - provide options for binding to a specific local address, interface, etc
Connect(NormInstanceHandle instance,const char * serverAddr,UINT16 serverPort,const char * groupAddr,NormNodeId clientId)349 bool NormSocket::Connect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId)
350 {
351     // For unicast connections, the "client" manages a single NormSession for send and receive
352     // (For multicast connections, there are two sessions: The same unicast session that will
353     //  be set to txOnly upon CONNECT and a NormSession for multicast reception)
354     norm_session = NormCreateSession(instance, "127.0.0.1", 0, clientId);  // TBD - use "clientId" here for mcast sockets?
355     if (NORM_SESSION_INVALID == norm_session)
356     {
357         fprintf(stderr, "NormSocket::Connect() error: NormCreateSession() failure\n");
358         return false;
359     }
360     // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
361     //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
362     NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
363 
364     NormSetUserData(norm_session, this);
365     NormSetRxPortReuse(norm_session, true, NULL, serverAddr, serverPort);
366     // TBD - for a multicast connection, the unicast receiver could be started with minimal buffer
367     // (not that it matters since the buffers aren't activated until a sender starts sending _data_)
368     if (!NormStartReceiver(norm_session, 2*1024*1024))  // to get ephemeral port assigned
369     {
370         fprintf(stderr, "NormSocket::Connect() error: unicast NormStartReceiver() failure\n");
371         return false;
372     }
373     NormChangeDestination(norm_session, serverAddr, serverPort, false); // "connect" our NORM tx_socket (so we can get ICMP)
374     NormSessionId sessionId = NormGetRandomSessionId();
375     NormAddAckingNode(norm_session, 1);  // servers always have NormNodeId '1'
376     NormSetFlowControl(norm_session, 0); // since we do explicit, ACK-based flow control
377     if (!NormStartSender(norm_session, sessionId, 2*1024*1024, 1400, 16, 4))
378     {
379         fprintf(stderr, "NormSocket::Connect() error: NormStartSender() failure\n");
380         return false;
381     }
382 
383     if (NULL != groupAddr)
384     {
385         // Create the "mcast_session" for multicast reception
386         mcast_session = NormCreateSession(instance, groupAddr, serverPort, clientId);
387         //NormSetTxPort(mcast_session, serverPort);  // TBD - not sure this is a good idea if multiple clients on a machine?
388         NormSetUserData(mcast_session, this);
389         // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
390         //NormSetDefaultSyncPolicy(mcast_session, NORM_SYNC_STREAM);
391         NormSetDefaultSyncPolicy(mcast_session, NORM_SYNC_ALL);
392 
393         NormSetDefaultUnicastNack(mcast_session, true);  // we could optionally allow multicast NACKing, too
394         NormSetMulticastLoopback(norm_session, true);  // for testing
395         client_id = clientId;
396         // TBD - make this SSM??? ... this would allow for multiple servers using the same groupAddr/port
397         NormSetRxPortReuse(mcast_session, true, groupAddr);  // Should we upgrade rx port reuse and 'connect' to server tx port upon CONNECT?
398         // For a "lightweight" client->server connection establishment, we go ahead and
399         // stop our unicast receiver and start multicast receiver, assuming the server
400         // will admit us into the group.
401         // (TBD - provide a "heavier weight" connection acceptance confirmation/denial signal from server
402         // via unicast from server -> client here (i.e. keep the unicast receiver open)
403         if (!NormStartReceiver(mcast_session, 2*1024*1024))  // to get ephemeral port assigned
404         {
405             fprintf(stderr, "NormSocket::Connect() error: multicast NormStartReceiver() failure\n");
406             return false;
407         }
408     }
409     else
410     {
411         // Set timeout for connect attempt (for "heavyweight" mcast connect, this would also be done)
412         NormSetUserTimer(norm_session, NORM_DEFAULT_CONNECT_TIMEOUT);
413     }
414     server_socket = NULL;  // this is a client-side socket
415 
416     socket_state = CONNECTING;
417 
418     return true;
419 }  // end NormSocket::Connect()
420 
421 
Write(const char * buffer,unsigned int numBytes)422 unsigned int NormSocket::Write(const char* buffer, unsigned int numBytes)
423 {
424     // TBD - make sure the socket is CONNECTED first
425     if (IsMulticastClient() && IsServerSide())
426     {
427         // This is multicast server rxonly client socket, so we redirect
428         // the write() to the associated txonly multicast socket
429         return server_socket->Write(buffer, numBytes);
430     }
431     // TBD - if tx_stream not yet open, open it!!!
432     if (NORM_OBJECT_INVALID == tx_stream)
433     {
434         tx_stream = NormStreamOpen(norm_session, 2*1024*1024);
435         InitTxStream(tx_stream, 2*1024*1024, 1400, 16);
436     }
437 
438     // This method uses NormStreamWrite(), but limits writes by explicit ACK-based flow control status
439     if (tx_stream_buffer_count < tx_stream_buffer_max)
440     {
441         // 1) How many buffer bytes are available?
442         unsigned int bytesAvailable = tx_segment_size * (tx_stream_buffer_max - tx_stream_buffer_count);
443         bytesAvailable -= tx_stream_bytes_remain;  // unflushed segment portiomn
444         if (numBytes <= bytesAvailable)
445         {
446             unsigned int totalBytes = numBytes + tx_stream_bytes_remain;
447             unsigned int numSegments = totalBytes / tx_segment_size;
448             tx_stream_bytes_remain = totalBytes % tx_segment_size;
449             tx_stream_buffer_count += numSegments;
450         }
451         else
452         {
453             numBytes = bytesAvailable;
454             tx_stream_buffer_count = tx_stream_buffer_max;
455         }
456         // 2) Write to the stream
457         unsigned int bytesWritten = NormStreamWrite(tx_stream, buffer, numBytes);
458         //assert(bytesWritten == numBytes);  // this could happen if timer-based flow control is left enabled
459         // 3) Check if we need to issue a watermark ACK request?
460         if (!tx_watermark_pending && (tx_stream_buffer_count >= (tx_stream_buffer_max / 2)))
461         {
462             //fprintf(stderr, "tx_engine_t::WriteToNormStream() initiating watermark ACK request (buffer count:%lu max:%lu usage:%u)...\n",
463             //            tx_stream_buffer_count, tx_stream_buffer_max, NormStreamGetBufferUsage(tx_stream));
464             NormSetWatermark(norm_session, tx_stream);
465             tx_watermark_pending = true;
466         }
467         return bytesWritten;
468     }
469     else
470     {
471         return 0;
472     }
473 }  // end NormSocket::Write()
474 
Flush(bool eom,NormFlushMode flushMode)475 void NormSocket::Flush(bool eom, NormFlushMode flushMode)
476 {
477     // TBD - make sure the socket is CONNECTED first
478     if (IsMulticastClient() && IsServerSide())
479     {
480         // This is multicast server rxOnly client socket, so we redirect
481         // the flush() to the associated txonly multicast socket
482         return server_socket->Flush(eom, flushMode);
483     }
484 
485     // NormStreamFlush always will transmit pending runt segments, if applicable
486     // (thus we need to manage our buffer counting accordingly if pending bytes remain)
487     if (tx_watermark_pending)
488     {
489         NormStreamFlush(tx_stream, eom, flushMode);
490     }
491     else if (NORM_FLUSH_ACTIVE == flushMode)
492     {
493         // we flush passive, because watermark forces active ack request
494         NormStreamFlush(tx_stream, eom, NORM_FLUSH_PASSIVE);
495         NormSetWatermark(norm_session, tx_stream, true);
496     }
497     else
498     {
499         NormStreamFlush(tx_stream, eom, flushMode);
500     }
501 
502     if (0 != tx_stream_bytes_remain)
503     {
504         // The flush forces the runt segment out, so we increment our buffer usage count
505         tx_stream_buffer_count++;
506         tx_stream_bytes_remain = 0;
507         if (!tx_watermark_pending && (tx_stream_buffer_count >= (tx_stream_buffer_max >> 1)))
508         {
509             //fprintf(stderr, "tx_engine_t::stream_flush() initiating watermark ACK request (buffer count:%lu max:%lu usage:%u)...\n",
510             //       tx_stream_buffer_count, tx_stream_buffer_max);
511             NormSetWatermark(norm_session, tx_stream, true);
512             tx_watermark_pending = true;
513         }
514     }
515  }  // end NormSocket::Flush()
516 
Read(char * buffer,unsigned int & numBytes)517  bool NormSocket::Read(char* buffer, unsigned int& numBytes)
518  {
519      // TBD - make sure rx_stream is valid!
520      // TBD - make sure this is not a tx only client socket ...
521      return NormStreamRead(rx_stream, buffer, &numBytes);
522  }  // end NormSocket::Read()
523 
Shutdown()524  void NormSocket::Shutdown()
525  {
526      if ((NORM_OBJECT_INVALID == tx_stream)  ||
527          (IsServerSide() && IsMulticastClient()))
528      {
529          Close();  // close immediately since this socket doesn't control a tx_stream
530      }
531      else
532      {
533          // It controls a tx_stream, so shutdown the tx_stream gracefully
534          NormStreamClose(tx_stream, true);  // Note our "trick" here to do a graceful close, _then_ watermark to get ack
535          NormSetWatermark(norm_session, tx_stream, true);  // future NORM API will add "bool watermark" option to graceful close
536          socket_state = CLOSING;
537      }
538  }  // end NormSocket::Shutdown()
539 
Close()540  void NormSocket::Close()
541  {
542      if (IsMulticastSocket())
543      {
544          if (IsServerSide())
545          {
546              if (IsServerSocket())
547              {
548                  // IsMulticastSocket() guarantees the mcast_session is valid
549                  // Dissociate remaining clients from this session and set their
550                  // timer so that NORM_SOCKET_CLOSED events are dispatched for them
551                  NormNodeId nodeId = NORM_NODE_NONE;
552                  while (NormGetNextAckingNode(mcast_session, &nodeId))
553                  {
554                      NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, nodeId);
555                      assert(NORM_NODE_INVALID != node);
556                      NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(node);
557                      NormSetUserTimer(clientSocket->norm_session, 0.0);
558                  }
559                  // for mcast server mcast_session == norm_session so it's destroyed below
560              }
561              else
562              {
563                  // "IsServerSide()" guarantees the "server_socket" is non-NULL
564                  // server-side multicast client socket closing, so we
565                  // need to remove this "client" NormNodeId from the mcast
566                  // session's acking node list
567                  server_socket->RemoveAckingNode(client_id);
568             }
569         }
570         else  // client-side multicast socket, so we need to destroy mcast_session, too
571         {
572             NormDestroySession(mcast_session);
573         }
574         mcast_session = NORM_SESSION_INVALID;
575      }
576      if (NORM_SESSION_INVALID != norm_session)
577      {
578          NormDestroySession(norm_session);
579          norm_session = NORM_SESSION_INVALID;
580      }
581      server_socket = NULL;
582      tx_stream = NORM_OBJECT_INVALID;
583      tx_segment_size = 0;
584      tx_stream_buffer_max = tx_stream_buffer_count = tx_stream_bytes_remain = 0;
585      tx_watermark_pending = false;
586      rx_stream = NORM_OBJECT_INVALID;
587      socket_state = CLOSED;
588  }  // end NormSocket::Close()
589 
590 
GetSocketEvent(const NormEvent & event,NormSocketEvent & socketEvent)591 void NormSocket::GetSocketEvent(const NormEvent& event, NormSocketEvent& socketEvent)
592 {
593     socketEvent.socket = (NormSocketHandle)this;
594     socketEvent.type = NORM_SOCKET_NONE;  // default socket event type if no socket-specific state change occurs
595     socketEvent.event = event;
596     switch (event.type)
597     {
598         case NORM_TX_QUEUE_EMPTY:
599         case NORM_TX_QUEUE_VACANCY:
600         {
601             // The socket may be tx ready, so issue a NORM_SOCKET_WRITE event
602             if (CONNECTED == socket_state)
603             {
604                 if (!tx_ready)
605                 {
606                     tx_ready = true;
607                     socketEvent.type = NORM_SOCKET_WRITE;
608                 }
609             }
610             break;
611         }
612         case NORM_TX_WATERMARK_COMPLETED:
613         {
614             switch (socket_state)
615             {
616                 case ACCEPTING:
617                 {
618                     // This only comes into play for the "confirmed connection"
619                     // model for multicast sockets (not yet implemented)
620                     assert(0);
621                     assert(IsServerSide() && IsMulticastClient());
622                     if (NORM_ACK_SUCCESS == NormGetAckingStatus(norm_session))
623                     {
624                         // Client has acknowledged our acceptance
625                         socketEvent.type = NORM_SOCKET_CONNECT;
626                         NormStopSender(norm_session);  // the mcast_session is our tx channel
627                         break;
628                     }
629                     else
630                     {
631                         // Client didn't acknowledge, so we cull him from our server
632                         Close();
633                         socketEvent.type = NORM_SOCKET_CLOSED;
634                     }
635                     break;
636                 }
637                 case CLOSING:
638                 {
639                     // Socket that was shutdown has either been acknowledged or timed out
640                     // TBD - should we issue a different event if ACK_FAILURE???
641                     Close();
642                     socketEvent.type = NORM_SOCKET_CLOSED;
643                     break;
644                 }
645                 default:
646                 {
647                     // TBD - implement option for more persistence
648                     bool success = false;
649                     if (NORM_ACK_SUCCESS == NormGetAckingStatus(norm_session))
650                     {
651                         success = true;
652                     }
653                     else
654                     {
655                         // At least one receiver didn't acknowledge
656                         if (IsUnicastSocket() || IsMulticastClient())
657                         {
658                             // We could be infinitely persistent w/ NormResetWatermark()
659                             //NormResetWatermark(event.session);
660                             // For now, we'll just declare the connection broken/closed
661                             Close();
662                             socketEvent.type = NORM_SOCKET_CLOSED;
663                         }
664                         else
665                         {
666                             // Multicast server, so determine who failed to acknowledge
667                             // and cull them from our acking node list ... and shutdown
668                             // their associated unicast sockets ... ugh!!!
669                             NormNodeId nodeId = NORM_NODE_NONE;
670                             NormAckingStatus ackingStatus;
671                             while (NormGetNextAckingNode(mcast_session, &nodeId, &ackingStatus))
672                             {
673                                 if (NORM_ACK_SUCCESS == ackingStatus)
674                                 {
675                                     success = true;  // there was at least one success
676                                 }
677                                 else
678                                 {
679                                     NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, nodeId);
680                                     assert(NORM_NODE_INVALID != node);
681                                     NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(node);
682                                     assert(NULL != clientSocket);
683                                     // We use the session timer to dispatch a NORM_SOCKET_CLOSED per failed client
684                                     // (This will also remove the client from this server's acking list)
685                                     NormSetUserTimer(clientSocket->norm_session, 0.0);
686                                     clientSocket->socket_state = CLOSING;
687                                 }
688                             }
689                             // TBD - what do we if all clients failed ... issue a NORM_SOCKET_DISCONNECT event,
690                             // probably stop sending data and resume when a new client appears ???
691                         }
692                     }
693                     if (tx_watermark_pending && success)
694                     {
695                         // flow control acknowledgement
696                         tx_watermark_pending = false;
697                         tx_stream_buffer_count -= (tx_stream_buffer_max >> 1);
698                         if (!tx_ready)
699                         {
700                             tx_ready = true;
701                             socketEvent.type = NORM_SOCKET_WRITE;
702                         }
703                     }
704                     break;
705                 }
706             }
707             break;
708         }
709         case NORM_REMOTE_SENDER_RESET:
710         case NORM_REMOTE_SENDER_NEW:
711         {
712             switch (socket_state)
713             {
714                 case LISTENING:
715                     socketEvent.type = NORM_SOCKET_ACCEPT;
716                     break;
717                 case ACCEPTING:
718                     if (IsServerSide() && IsClientSocket() && (NORM_NODE_INVALID != remote_node))
719                         NormNodeDelete(remote_node);
720                 case CONNECTING:
721                     // TBD - We should validate that it's the right remote sender
722                     //       (i.e., by source address and/or nodeId)
723                     NormCancelUserTimer(norm_session);
724                     socketEvent.type = NORM_SOCKET_CONNECT;
725                     socket_state = CONNECTED;
726                     remote_node = event.sender;
727                     break;
728                 case CONNECTED:
729                     if (IsMulticastSocket())
730                     {
731                         if (IsServerSocket())
732                         {
733                             // New client showing up at our multicast party
734                             socketEvent.type = NORM_SOCKET_ACCEPT;
735                         }
736                         else
737                         {
738                             // Different sender showing up in multicast group!?
739                             fprintf(stderr, "NormSocket warning: multicast sender %s reset?!\n", NormNodeGetAddressString(event.sender));
740                             // TBD - should Close() the socket and issue a NORM_SOCKET_CLOSED event
741                             //        and leave it up to the application to reconnect?  Or should we
742                             //        provides some sort of NORM_SOCKET_DISCONNECT event
743                             socketEvent.type = NORM_SOCKET_CLOSED;
744                             Close();
745                         }
746                     }
747                     else  // unicast
748                     {
749                         // Eemote sender reset? How do we tell?
750                         fprintf(stderr, "NormSocket warning: unicast sender %s reset?!\n", NormNodeGetAddressString(event.sender));
751                         socketEvent.type = NORM_SOCKET_CLOSED;
752                         Close();
753                     }
754                     break;
755 
756                 default:  // CLOSING, CLOSED
757                     // shouldn't happen
758                     break;
759             }
760             break;
761         }
762         case NORM_SEND_ERROR:
763         {
764             switch (socket_state)
765             {
766                 case CONNECTING:
767                 case ACCEPTING:
768                 case CONNECTED:
769                 case CLOSING:
770                     if (IsMulticastServer())
771                         fprintf(stderr, "SEND_ERROR on a multicast server socket?!\n");
772                     socketEvent.event.sender = remote_node;
773                     socketEvent.type = NORM_SOCKET_CLOSED;
774                     Close();
775                     break;
776                 default:
777                     // shouldn't happen
778                     break;
779             }
780             break;
781         }
782 
783         case NORM_USER_TIMEOUT:
784         {
785             switch (socket_state)
786             {
787                 case CONNECTING:    // client connection attempt timed out
788                 case ACCEPTING:     // accepted client didn't follow through
789                 case CONNECTED:     // multicast client ack failure
790                 case CLOSING:
791                     socketEvent.event.sender = remote_node;
792                     socketEvent.type = NORM_SOCKET_CLOSED;
793                     Close();
794                     break;
795                 default:
796                     // shouldn't happen
797                     assert(0);
798                     break;
799             }
800             break;
801         }
802         case NORM_REMOTE_SENDER_INACTIVE:
803         {
804             switch (socket_state)
805             {
806                 case LISTENING:
807                 {
808                     // delete state for remote sender that has been accepted (or not)
809                     // TBD - do something a little more tidy here
810                     NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(event.sender);
811                     if ((NULL != clientSocket) && (clientSocket->remote_node == event.sender))
812                         clientSocket->remote_node = NORM_NODE_INVALID;
813                     NormNodeDelete(event.sender);
814                     break;
815                 }
816                 case CONNECTED:
817                 {
818                     if (IsServerSocket())
819                     {
820                         NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(event.sender);
821                         if ((NULL != clientSocket) && (clientSocket->remote_node == event.sender))
822                             clientSocket->remote_node = NORM_NODE_INVALID;
823                         NormNodeDelete(event.sender);
824                     }
825                     // TBD - should we do something here (perhaps issue a NORM_SOCKET_IDLE event or something
826                     // that could be used as a clue that our "connection" may have broken or timed out???
827                     // (Meanwhile, applications will have to figure that our for themselves)
828                     break;
829                 }
830                 default:  // CONNECTING, ACCEPTING, CLOSING, CLOSED
831                 {
832                     // shouldn't happen
833                     break;
834                 }
835             }
836             break;
837         }
838         case NORM_RX_OBJECT_NEW:
839         {
840             switch (socket_state)
841             {
842                 case LISTENING:
843                     // TBD - shouldn't happen, delete sender right away?
844                     break;
845                 case CONNECTED:
846                     // TBD - make sure the sender is who we expect it to be???
847                     if (IsServerSocket()) break;
848                     if (NORM_OBJECT_INVALID == rx_stream)
849                     {
850                         // We're expecting this, new stream ready for reading ...
851                         InitRxStream(event.object);
852                         socketEvent.type = NORM_SOCKET_READ;
853                     }
854                     else
855                     {
856                         // Stream reset
857                         fprintf(stderr, "NormSocket::GetSocketEvent(): client stream reset?!\n");
858                     }
859                     break;
860                 default:  // CONNECTING, ACCEPTING, CLOSING, CLOSED
861                     // shouldn't happen
862                     break;
863             }
864             break;
865         }
866         case NORM_RX_OBJECT_UPDATED:
867         {
868             switch (socket_state)
869             {
870                 case CONNECTED:
871                 case CLOSING:  // we allow reading during graceful closure
872                     // TBD - use an rx_ready indication to filter this event a little more
873                     if (IsServerSocket()) break;  // we don't receive data on server socket
874                     assert(event.object == rx_stream);
875                     socketEvent.type = NORM_SOCKET_READ;
876                     break;
877                 default:
878                     // shouldn't happen
879                     break;
880             }
881             break;
882         }
883         case NORM_RX_OBJECT_COMPLETED:
884         {
885             rx_stream = NORM_OBJECT_INVALID;
886             switch (socket_state)
887             {
888                 case CONNECTED:
889                     // Initiate graceful closure of our tx_stream to allow at least some time to
890                     // acknowledge the remote before closing everything down
891                     NormStreamClose(tx_stream, true);  // Note our "trick" here to do a graceful close, _then_ watermark to get ack
892                     NormSetWatermark(norm_session, tx_stream, true);  // future NORM API will add "bool watermark" option to graceful close
893                     socket_state = CLOSING;
894                     socketEvent.type = NORM_SOCKET_CLOSING;
895                     break;
896 
897                 case CLOSING:
898                     // We're already closing, so just let that complete.  This helps make sure we allow
899                     // at least some time to acknowledge the remote before closing everything down
900                     break;
901                 default:
902                     // shouldn't happen
903                     break;
904             }
905         }
906         default:
907             break;
908     }
909 }  // end NormSocket::GetSocketEvent()
910 
911 
912 
913  ///////////////////////////////////////////////////////////////////////////////////
914  // NormSocket API implementation
915 
916 // TBD - provide options for binding to a specific local address, interface, etc
NormListen(NormInstanceHandle instance,UINT16 serverPort,const char * groupAddr)917 NormSocketHandle NormListen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr)
918 {
919     // TBD - check results
920     NormSocket* normSocket = new NormSocket();
921     normSocket->Listen(instance, serverPort, groupAddr);
922     return (NormSocketHandle)normSocket;
923 }  // end NormListen()
924 
925 
NormAccept(NormSocketHandle serverSocket,NormNodeHandle client)926 NormSocketHandle NormAccept(NormSocketHandle serverSocket, NormNodeHandle client)
927 {
928     // TBD - VALIDATE PARAMETERS AND ERROR CHECK ALL THE API CALLS MADE HERE !!!!!
929     NormSocket* s = (NormSocket*)serverSocket;
930     return (NormSocketHandle)(s->Accept(client));
931 }  // end NormAccept()
932 
933 
934 // TBD - provide options for binding to a specific local address, interface, etc
NormConnect(NormInstanceHandle instance,const char * serverAddr,UINT16 serverPort,const char * groupAddr,NormNodeId clientId)935 NormSocketHandle NormConnect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId)
936 {
937     NormSocket* normSocket = new NormSocket();
938     if (NULL == normSocket)
939     {
940         perror("NormConnect() new NormSocket() error");
941         return NULL;
942     }
943     if (normSocket->Connect(instance, serverAddr, serverPort, groupAddr, clientId))
944     {
945         return normSocket;
946     }
947     else
948     {
949         delete normSocket;
950         return NULL;
951     }
952 }  // end NormConnect()
953 
954 
NormWrite(NormSocketHandle normSocket,const void * buf,size_t nbyte)955 ssize_t NormWrite(NormSocketHandle normSocket, const void *buf, size_t nbyte)
956 {
957     // TBD - we could make write() and read() optionally blocking or non-blocking
958     //       by using GetSocketEvent() as appropriate (incl. returning error conditions, etc)
959     NormSocket* s = (NormSocket*)normSocket;
960     return (ssize_t)s->Write((const char*)buf, nbyte);
961 }  // end NormWrite()
962 
NormFlush(NormSocketHandle normSocket)963 int NormFlush(NormSocketHandle normSocket)
964 {
965     NormSocket* s = (NormSocket*)normSocket;
966     s->Flush();
967     return 0;
968 } // end NormFlush()
969 
NormRead(NormSocketHandle normSocket,void * buf,size_t nbyte)970 ssize_t NormRead(NormSocketHandle normSocket, void *buf, size_t nbyte)
971 {
972     // TBD - we could make write() and read() optionally blocking or non-blocking
973     //       by using GetSocketEvent() as appropriate (incl. returning error conditions, etc)
974     NormSocket* s = (NormSocket*)normSocket;
975     // TBD - make sure s->rx_stream is valid
976     unsigned int numBytes = nbyte;
977     if (s->Read((char*)buf, numBytes))
978         return numBytes;
979     else
980         return -1; // broken stream error (TBD - enumerate socket error values)
981 }  // end NormWrite()
982 
983 
NormShutdown(NormSocketHandle normSocket)984 void NormShutdown(NormSocketHandle normSocket)
985 {
986     NormSocket* s = (NormSocket*)normSocket;
987     s->Shutdown();
988 }  // end NormShutdown()
989 
NormClose(NormSocketHandle normSocket)990 void NormClose(NormSocketHandle normSocket)
991 {
992     NormSocket* s = (NormSocket*)normSocket;
993     s->Close();
994 }  // end NormClose()
995 
996 // This gets and translates low level NORM API events to NormSocket events
997 // given the "normSocket" state
NormGetSocketEvent(NormInstanceHandle instance,NormSocketEvent * socketEvent,bool waitForEvent)998 bool NormGetSocketEvent(NormInstanceHandle instance, NormSocketEvent* socketEvent, bool waitForEvent)
999 {
1000     if (NULL == socketEvent) return false;
1001     NormEvent event;
1002     if (NormGetNextEvent(instance, &event, waitForEvent))
1003     {
1004         NormSocket* normSocket = NULL;
1005         if (NORM_SESSION_INVALID != event.session)
1006             normSocket = (NormSocket*)NormGetUserData(event.session);
1007         if (NULL == normSocket)
1008         {
1009             socketEvent->type = NORM_SOCKET_NONE;
1010             socketEvent->socket = NORM_SOCKET_INVALID;
1011             socketEvent->event = event;
1012         }
1013         else
1014         {
1015             normSocket->GetSocketEvent(event, *socketEvent);
1016         }
1017         return true;
1018     }
1019     else
1020     {
1021         return false;
1022     }
1023 }  // end NormGetSocketEvent()
1024 
1025 // Other helper functions
1026 
NormGetSession(NormSocketHandle normSocket)1027 NormSessionHandle NormGetSession(NormSocketHandle normSocket)
1028 {
1029     NormSocket* s = (NormSocket*)normSocket;
1030     return s->GetSession();
1031 }  // end NormGetSession()
1032 
NormGetMulticastSession(NormSocketHandle normSocket)1033 NormSessionHandle NormGetMulticastSession(NormSocketHandle normSocket)
1034 {
1035     NormSocket* s = (NormSocket*)normSocket;
1036     return s->GetMulticastSession();
1037 }  // end NormGetSession()
1038