1
2 #include "normSocket.h"
3 #include <stdio.h> // for stderr
4 #include <assert.h> // for assert()
5 #include <string.h> // for strlen()
6 #include <arpa/inet.h> // for inet_ntoa() (TBD - change to use Protolib routines?)
7
8 // COMPILE: (assumes "normApi.h" in "include" ...
9 // g++ -I../include -c normSocket.cpp
10
11
12 // This "NormSocket" class is used to maintain tx/rx state for a NORM "socket" connection.
13 // At the moment this "socket" connection represents a single, bi-directional NORM_OBJECT_STREAM
14 // in either a unicast context or an asymmetric "server" multicast stream to possibly multiple "client"
15 // nodes with individual unicast streams in return from those "client" nodes. (I.e., the server will need to
16 // have a normSocket per client even for the server multicast case (maybe :-) )
17
18 const NormSocketHandle NORM_SOCKET_INVALID = (NormSocketHandle)0;
19
20 const double NORM_DEFAULT_CONNECT_TIMEOUT = 60.0;
21
22 // a 'helper' function we use for debugging
NormNodeGetAddressString(NormNodeHandle node)23 const char* NormNodeGetAddressString(NormNodeHandle node)
24 {
25 char addr[16]; // big enough for IPv6
26 unsigned int addrLen = 16;
27 UINT16 port;
28 if (NormNodeGetAddress(node, addr, &addrLen, &port))
29 {
30 static char text[64];
31 text[0] = text[31] = '\0';
32 int addrFamily;
33 if (4 == addrLen)
34 addrFamily = AF_INET;
35 else
36 addrFamily = AF_INET6;
37 inet_ntop(addrFamily, addr, text, 31);
38 sprintf(text + strlen(text), "/%hu", port);
39 return text;
40 }
41 else
42 {
43 return "???";
44 }
45 } // end NormNodeGetAddressString()
46
47 class NormSocket
48 {
49 public:
50 NormSocket(NormSessionHandle normSession = NORM_SESSION_INVALID);
51
52 // These methods identify the role of this socket with respect
53 // to the client / server relationship (a "server socket" is
54 // one for which NormListen() has been invoked).
IsServerSocket() const55 bool IsServerSocket() const
56 {return (server_socket == this);}
IsClientSocket() const57 bool IsClientSocket() const
58 {return (server_socket != this);}
IsUnicastSocket() const59 bool IsUnicastSocket() const
60 {return (NORM_SESSION_INVALID == mcast_session);}
IsMulticastSocket() const61 bool IsMulticastSocket() const
62 {return !IsUnicastSocket();}
IsMulticastServer() const63 bool IsMulticastServer() const
64 {return (IsMulticastSocket() && IsServerSocket());}
IsMulticastClient() const65 bool IsMulticastClient() const
66 {return (IsMulticastSocket() && IsClientSocket());}
IsServerSide() const67 bool IsServerSide() const
68 {return (NULL != server_socket);}
IsClientSide() const69 bool IsClientSide() const
70 {return (NULL == server_socket);}
71
GetSession() const72 NormSessionHandle GetSession() const
73 {return norm_session;}
GetMulticastSession() const74 NormSessionHandle GetMulticastSession() const
75 {return mcast_session;}
76
InitRxStream(NormObjectHandle rxStream)77 void InitRxStream(NormObjectHandle rxStream)
78 {rx_stream = rxStream;}
GetRxStream() const79 NormObjectHandle GetRxStream() const
80 {return rx_stream;}
81
InitTxStream(NormObjectHandle txStream,unsigned int bufferSize,UINT16 segmentSize,UINT16 blockSize)82 void InitTxStream(NormObjectHandle txStream, unsigned int bufferSize, UINT16 segmentSize, UINT16 blockSize)
83 {
84 tx_stream = txStream;
85 tx_segment_size = segmentSize;
86 tx_stream_buffer_max = NormGetStreamBufferSegmentCount(bufferSize, segmentSize, blockSize);
87 tx_stream_buffer_max -= blockSize; // a little safety margin (perhaps not necessary)
88 tx_stream_buffer_count = 0;
89 tx_stream_bytes_remain = 0;
90 tx_watermark_pending = false;
91 }
92
93
94 bool Listen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr);
95 NormSocket* Accept(NormNodeHandle client, NormInstanceHandle instance = NORM_INSTANCE_INVALID);
96 bool Connect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId);
97
98
99 // Write to tx stream (with flow control)
100 unsigned int Write(const char* buffer, unsigned int numBytes);
101 void Flush(bool eom = false, NormFlushMode flushMode = NORM_FLUSH_ACTIVE);
102 // Read from rx_stream
103 bool Read(char* buffer, unsigned int& numBytes);
104
105 // "graceful" shutdown (stream is flushed and stream end, etc)
106 void Shutdown();
107
108 // hard, immediate closure
109 void Close();
110
111 void GetSocketEvent(const NormEvent& event, NormSocketEvent& socketEvent);
112
113 typedef enum State
114 {
115 CLOSED,
116 LISTENING,
117 CONNECTING,
118 ACCEPTING,
119 CONNECTED,
120 CLOSING
121 } State;
122
AddAckingNode(NormNodeId nodeId)123 bool AddAckingNode(NormNodeId nodeId)
124 {
125 if (NormAddAckingNode(norm_session, nodeId))
126 {
127 client_count++;
128 return true;
129 }
130 else
131 {
132 return false;
133 }
134 }
RemoveAckingNode(NormNodeId nodeId)135 void RemoveAckingNode(NormNodeId nodeId)
136 {NormRemoveAckingNode(norm_session, nodeId);}
137
138
139 private:
140 State socket_state;
141 NormSessionHandle norm_session;
142 NormSessionHandle mcast_session; // equals norm_session for a multicast server
143 NormSocket* server_socket; // only applies to server-side sockets
144 unsigned int client_count; // only applies to mcast server sockets
145 NormNodeId client_id; // only applies to mcast client socket
146 NormNodeHandle remote_node; //
147 // Send stream and associated flow control state variables
148 NormObjectHandle tx_stream;
149 bool tx_ready;
150 UINT16 tx_segment_size;
151 unsigned int tx_stream_buffer_max;
152 unsigned int tx_stream_buffer_count;
153 unsigned int tx_stream_bytes_remain;
154 bool tx_watermark_pending;
155 // Receive stream state
156 NormObjectHandle rx_stream;
157
158 }; // end class NormSocket
159
160
NormSocket(NormSessionHandle normSession)161 NormSocket::NormSocket(NormSessionHandle normSession)
162 : socket_state(CLOSED), norm_session(normSession),
163 mcast_session(NORM_SESSION_INVALID), server_socket(NULL),
164 client_count(0), client_id(NORM_NODE_NONE), remote_node(NORM_NODE_INVALID),
165 tx_stream(NORM_OBJECT_INVALID), tx_ready(false), tx_segment_size(0),
166 tx_stream_buffer_max(0), tx_stream_buffer_count(0),
167 tx_stream_bytes_remain(0), tx_watermark_pending(false),
168 rx_stream(NORM_OBJECT_INVALID)
169 {
170 // For now we use the NormSession "user data" option to associate
171 // the session with a "socket". In the future we may add a
172 // dedicated NormSetSocket(NormSessionHandle session, NormSocketHandle normSocket) API
173 // to keep the "user data" feature available for other purposes
174 if (NORM_SESSION_INVALID != normSession) // this should always be true
175 NormSetUserData(normSession, this);
176 }
177
178
Listen(NormInstanceHandle instance,UINT16 serverPort,const char * groupAddr)179 bool NormSocket::Listen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr)
180 {
181 if (CLOSED != socket_state)
182 {
183 fprintf(stderr, "NormSocket::Listen() error: socket already open?!\n");
184 return false;
185 }
186
187 if (NULL != groupAddr)
188 {
189 // TBD - validate that "groupAddr" is indeed a multicast address
190 norm_session = NormCreateSession(instance, groupAddr, serverPort, NORM_NODE_ANY);
191 NormSetTxPort(norm_session, serverPort); // can't do this and receive unicast feedback
192 mcast_session = norm_session;
193 }
194 else
195 {
196 // For unicast , the "server" has a NormNodeId of '1' and the "clients" are '2'
197 // to obviate need for explicit id management and will allow NAT to work, etc
198 norm_session = NormCreateSession(instance, "127.0.0.1", serverPort, 1);
199 }
200 if (NORM_SESSION_INVALID == norm_session)
201 {
202 fprintf(stderr, "NormSocket::Listen() error: NormCreateSession() failure\n");
203 return false;
204 }
205 NormSetUserData(norm_session, this);
206 // Note the port reuse here lets us manage our "client" rx-only unicast connections the
207 // way we need, but does allow a second multicast server to be started on this group which leads
208 // to undefined behavior. TBD - see if we can prevent via binding wizardry
209 // (How is it done for TCP servers? - probably because the accept() call is in the network stack
210 // instead of user-space) Perhaps we could have a semaphore lock to block a second "server"
211 NormSetRxPortReuse(norm_session, true);
212
213 // use default sync policy so a "serversocket" doesn't NACK the senders it detects
214 // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
215 //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
216 //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
217
218 if (NULL == groupAddr)
219 {
220 // Unicast server
221 // Note we use a small buffer size here since a "listening" socket isn't
222 // going to be receiving data (TBD - implement a mechanism to handoff remote
223 // sender (i.e. "client") from parent
224 if (!NormStartReceiver(norm_session, 2048))
225 {
226 fprintf(stderr, "NormSocket::Listen() error: NormStartReceiver() failure (perhaps port already in use)\n");
227 NormDestroySession(norm_session);
228 norm_session = NORM_SESSION_INVALID;
229 }
230 }
231 else
232 {
233 //NormSetMulticastInterface(norm_session, "lo0");
234 NormSetMulticastLoopback(norm_session, true); // for testing
235 if (!NormStartReceiver(norm_session, 2048))
236 {
237 fprintf(stderr, "NormSocket::Listen() error: NormStartReceiver() failure (perhaps port already in use)\n");
238 NormDestroySession(norm_session);
239 norm_session = NORM_SESSION_INVALID;
240 }
241 // TBD - We _could_ go ahead and call NormStartSender(), but for now we'll wait until we hear the application
242 // makes at least one NormAccept() call ...
243 }
244 server_socket = this;
245 socket_state = LISTENING;
246 return true;
247 } // end NormSocket::Listen()
248
Accept(NormNodeHandle client,NormInstanceHandle instance)249 NormSocket* NormSocket::Accept(NormNodeHandle client, NormInstanceHandle instance)
250 {
251 if (!IsServerSocket()) return NULL;
252 char clientAddr[64];
253 clientAddr[63] = '\0';
254 char addr[16]; // big enough for IPv6
255 unsigned int addrLen = 16;
256 UINT16 clientPort;
257 NormNodeGetAddress(client, addr, &addrLen, &clientPort);
258 int addrFamily;
259 UINT8 version;
260 if (4 == addrLen)
261 {
262 addrFamily = AF_INET;
263 version = 4;
264 }
265 else
266 {
267 addrFamily = AF_INET6;
268 version = 6;
269 }
270 inet_ntop(addrFamily, addr, clientAddr, 63);
271
272 UINT16 serverPort = NormGetRxPort(norm_session);
273 if (NORM_INSTANCE_INVALID == instance)
274 instance = NormGetInstance(norm_session);
275
276 NormSessionHandle clientSession = NormCreateSession(instance, clientAddr, serverPort, 1);
277
278 NormSetTxPort(clientSession, serverPort, false);
279
280 // This next API call will cause NORM to tightly bind the remote client src addr/port to
281 // our server port so the "clientSession" captures the client packets instead of the "server" session
282
283 // Any new packets will come to our new connected clientSession instead
284 // However, note that even though we've "connected" this sender,
285 // there is a chance that additional packets in the "serverSession"
286 // rx socket buffer may look like a new sender if deleted now, so
287 // we wait for NORM_REMOTE_SENDER_INACTIVE to delete
288
289 // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
290 //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
291 NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
292
293 NormSetRxPortReuse(clientSession, true, 0, clientAddr, clientPort); // "connects" to remote client addr/port
294 NormSetDefaultUnicastNack(clientSession, true);
295
296 NormStartReceiver(clientSession, 2*1024*1024);
297
298 NormSocket* clientSocket = new NormSocket(clientSession);
299 clientSocket->server_socket = this; // this is a server-side socket
300 clientSocket->remote_node = client;
301 NormNodeSetUserData(client, clientSocket);
302
303 NormNodeId clientId = NormNodeGetId(client);
304
305 if (IsUnicastSocket())
306 {
307 // The clientSession is bi-directional so we need to NormStartSender(), etc
308 NormAddAckingNode(clientSession, 2); //clientId);
309 NormSetFlowControl(clientSession, 0); // disable timer-based flow control since we do explicit, ACK-based flow control
310 NormStartSender(clientSession, NormGetRandomSessionId(), 2*1024*1024, 1400, 16, 4);
311 }
312 else // if IsMulticastSocket()
313 {
314 // TBD - should we make sure this not a NormNodeId we already have?
315 // TBD - should we wait to add the client as acking node until CONNECT
316 // (probably for heavyweight; for lightweight we know the client
317 // has already started his multicast receiver)
318 AddAckingNode(clientId); // TBD - check result
319 NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, clientId);
320 NormNodeSetUserData(node, clientSocket); // a way to track mcast client sockets
321 clientSocket->mcast_session = mcast_session;
322 clientSocket->client_id = client_id;
323 if (LISTENING == socket_state)
324 {
325 NormSetFlowControl(norm_session, 0); // disable timer-based flow control since we do explicit, ACK-based flow control
326 NormStartSender(norm_session, NormGetRandomSessionId(), 2*1024*1024, 1400, 16, 4);
327 socket_state = CONNECTED;
328 }
329 /* The code below would be invoked for "heavyweight" mcast client admission
330 (for the moment we go with a "lightweight" model - this might be invokable upon
331 as an optional behavior later)
332
333 // Here, we start the clientSession (w/ a minimal buffer size) and create a temporary sender
334 // stream that is immediately flushed/closed to inform the "client" that his connection
335 // has been accepted. The sender function is terminated upon client acknowledgement
336 NormAddAckingNode(clientSession, clientId);
337 NormSetFlowControl(clientSession, 0); // disable timer-based flow control since we do explicit, ACK-based flow control
338 NormStartSender(clientSession, NormGetRandomSessionId(), 1024, 512, 1, 0);
339 NormObjectHandle tempStream = NormStreamOpen(clientSession, 1024);
340 NormStreamClose(tempStream, true); // Note our "trick" here to do a graceful close, _then_ watermark to get ack
341 NormSetWatermark(clientSession, tempStream, true); // future NORM API will add "bool watermark" option to graceful close
342 */
343 }
344 clientSocket->socket_state = ACCEPTING; // will transision to CONNECTED when client is detected on new clientSession
345 return clientSocket;
346 } // end NormSocket::Accept()
347
348 // TBD - provide options for binding to a specific local address, interface, etc
Connect(NormInstanceHandle instance,const char * serverAddr,UINT16 serverPort,const char * groupAddr,NormNodeId clientId)349 bool NormSocket::Connect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId)
350 {
351 // For unicast connections, the "client" manages a single NormSession for send and receive
352 // (For multicast connections, there are two sessions: The same unicast session that will
353 // be set to txOnly upon CONNECT and a NormSession for multicast reception)
354 norm_session = NormCreateSession(instance, "127.0.0.1", 0, clientId); // TBD - use "clientId" here for mcast sockets?
355 if (NORM_SESSION_INVALID == norm_session)
356 {
357 fprintf(stderr, "NormSocket::Connect() error: NormCreateSession() failure\n");
358 return false;
359 }
360 // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
361 //NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_STREAM);
362 NormSetDefaultSyncPolicy(norm_session, NORM_SYNC_ALL);
363
364 NormSetUserData(norm_session, this);
365 NormSetRxPortReuse(norm_session, true, NULL, serverAddr, serverPort);
366 // TBD - for a multicast connection, the unicast receiver could be started with minimal buffer
367 // (not that it matters since the buffers aren't activated until a sender starts sending _data_)
368 if (!NormStartReceiver(norm_session, 2*1024*1024)) // to get ephemeral port assigned
369 {
370 fprintf(stderr, "NormSocket::Connect() error: unicast NormStartReceiver() failure\n");
371 return false;
372 }
373 NormChangeDestination(norm_session, serverAddr, serverPort, false); // "connect" our NORM tx_socket (so we can get ICMP)
374 NormSessionId sessionId = NormGetRandomSessionId();
375 NormAddAckingNode(norm_session, 1); // servers always have NormNodeId '1'
376 NormSetFlowControl(norm_session, 0); // since we do explicit, ACK-based flow control
377 if (!NormStartSender(norm_session, sessionId, 2*1024*1024, 1400, 16, 4))
378 {
379 fprintf(stderr, "NormSocket::Connect() error: NormStartSender() failure\n");
380 return false;
381 }
382
383 if (NULL != groupAddr)
384 {
385 // Create the "mcast_session" for multicast reception
386 mcast_session = NormCreateSession(instance, groupAddr, serverPort, clientId);
387 //NormSetTxPort(mcast_session, serverPort); // TBD - not sure this is a good idea if multiple clients on a machine?
388 NormSetUserData(mcast_session, this);
389 // NORM_SYNC_STREAM tries to get everything the sender has cached/buffered
390 //NormSetDefaultSyncPolicy(mcast_session, NORM_SYNC_STREAM);
391 NormSetDefaultSyncPolicy(mcast_session, NORM_SYNC_ALL);
392
393 NormSetDefaultUnicastNack(mcast_session, true); // we could optionally allow multicast NACKing, too
394 NormSetMulticastLoopback(norm_session, true); // for testing
395 client_id = clientId;
396 // TBD - make this SSM??? ... this would allow for multiple servers using the same groupAddr/port
397 NormSetRxPortReuse(mcast_session, true, groupAddr); // Should we upgrade rx port reuse and 'connect' to server tx port upon CONNECT?
398 // For a "lightweight" client->server connection establishment, we go ahead and
399 // stop our unicast receiver and start multicast receiver, assuming the server
400 // will admit us into the group.
401 // (TBD - provide a "heavier weight" connection acceptance confirmation/denial signal from server
402 // via unicast from server -> client here (i.e. keep the unicast receiver open)
403 if (!NormStartReceiver(mcast_session, 2*1024*1024)) // to get ephemeral port assigned
404 {
405 fprintf(stderr, "NormSocket::Connect() error: multicast NormStartReceiver() failure\n");
406 return false;
407 }
408 }
409 else
410 {
411 // Set timeout for connect attempt (for "heavyweight" mcast connect, this would also be done)
412 NormSetUserTimer(norm_session, NORM_DEFAULT_CONNECT_TIMEOUT);
413 }
414 server_socket = NULL; // this is a client-side socket
415
416 socket_state = CONNECTING;
417
418 return true;
419 } // end NormSocket::Connect()
420
421
Write(const char * buffer,unsigned int numBytes)422 unsigned int NormSocket::Write(const char* buffer, unsigned int numBytes)
423 {
424 // TBD - make sure the socket is CONNECTED first
425 if (IsMulticastClient() && IsServerSide())
426 {
427 // This is multicast server rxonly client socket, so we redirect
428 // the write() to the associated txonly multicast socket
429 return server_socket->Write(buffer, numBytes);
430 }
431 // TBD - if tx_stream not yet open, open it!!!
432 if (NORM_OBJECT_INVALID == tx_stream)
433 {
434 tx_stream = NormStreamOpen(norm_session, 2*1024*1024);
435 InitTxStream(tx_stream, 2*1024*1024, 1400, 16);
436 }
437
438 // This method uses NormStreamWrite(), but limits writes by explicit ACK-based flow control status
439 if (tx_stream_buffer_count < tx_stream_buffer_max)
440 {
441 // 1) How many buffer bytes are available?
442 unsigned int bytesAvailable = tx_segment_size * (tx_stream_buffer_max - tx_stream_buffer_count);
443 bytesAvailable -= tx_stream_bytes_remain; // unflushed segment portiomn
444 if (numBytes <= bytesAvailable)
445 {
446 unsigned int totalBytes = numBytes + tx_stream_bytes_remain;
447 unsigned int numSegments = totalBytes / tx_segment_size;
448 tx_stream_bytes_remain = totalBytes % tx_segment_size;
449 tx_stream_buffer_count += numSegments;
450 }
451 else
452 {
453 numBytes = bytesAvailable;
454 tx_stream_buffer_count = tx_stream_buffer_max;
455 }
456 // 2) Write to the stream
457 unsigned int bytesWritten = NormStreamWrite(tx_stream, buffer, numBytes);
458 //assert(bytesWritten == numBytes); // this could happen if timer-based flow control is left enabled
459 // 3) Check if we need to issue a watermark ACK request?
460 if (!tx_watermark_pending && (tx_stream_buffer_count >= (tx_stream_buffer_max / 2)))
461 {
462 //fprintf(stderr, "tx_engine_t::WriteToNormStream() initiating watermark ACK request (buffer count:%lu max:%lu usage:%u)...\n",
463 // tx_stream_buffer_count, tx_stream_buffer_max, NormStreamGetBufferUsage(tx_stream));
464 NormSetWatermark(norm_session, tx_stream);
465 tx_watermark_pending = true;
466 }
467 return bytesWritten;
468 }
469 else
470 {
471 return 0;
472 }
473 } // end NormSocket::Write()
474
Flush(bool eom,NormFlushMode flushMode)475 void NormSocket::Flush(bool eom, NormFlushMode flushMode)
476 {
477 // TBD - make sure the socket is CONNECTED first
478 if (IsMulticastClient() && IsServerSide())
479 {
480 // This is multicast server rxOnly client socket, so we redirect
481 // the flush() to the associated txonly multicast socket
482 return server_socket->Flush(eom, flushMode);
483 }
484
485 // NormStreamFlush always will transmit pending runt segments, if applicable
486 // (thus we need to manage our buffer counting accordingly if pending bytes remain)
487 if (tx_watermark_pending)
488 {
489 NormStreamFlush(tx_stream, eom, flushMode);
490 }
491 else if (NORM_FLUSH_ACTIVE == flushMode)
492 {
493 // we flush passive, because watermark forces active ack request
494 NormStreamFlush(tx_stream, eom, NORM_FLUSH_PASSIVE);
495 NormSetWatermark(norm_session, tx_stream, true);
496 }
497 else
498 {
499 NormStreamFlush(tx_stream, eom, flushMode);
500 }
501
502 if (0 != tx_stream_bytes_remain)
503 {
504 // The flush forces the runt segment out, so we increment our buffer usage count
505 tx_stream_buffer_count++;
506 tx_stream_bytes_remain = 0;
507 if (!tx_watermark_pending && (tx_stream_buffer_count >= (tx_stream_buffer_max >> 1)))
508 {
509 //fprintf(stderr, "tx_engine_t::stream_flush() initiating watermark ACK request (buffer count:%lu max:%lu usage:%u)...\n",
510 // tx_stream_buffer_count, tx_stream_buffer_max);
511 NormSetWatermark(norm_session, tx_stream, true);
512 tx_watermark_pending = true;
513 }
514 }
515 } // end NormSocket::Flush()
516
Read(char * buffer,unsigned int & numBytes)517 bool NormSocket::Read(char* buffer, unsigned int& numBytes)
518 {
519 // TBD - make sure rx_stream is valid!
520 // TBD - make sure this is not a tx only client socket ...
521 return NormStreamRead(rx_stream, buffer, &numBytes);
522 } // end NormSocket::Read()
523
Shutdown()524 void NormSocket::Shutdown()
525 {
526 if ((NORM_OBJECT_INVALID == tx_stream) ||
527 (IsServerSide() && IsMulticastClient()))
528 {
529 Close(); // close immediately since this socket doesn't control a tx_stream
530 }
531 else
532 {
533 // It controls a tx_stream, so shutdown the tx_stream gracefully
534 NormStreamClose(tx_stream, true); // Note our "trick" here to do a graceful close, _then_ watermark to get ack
535 NormSetWatermark(norm_session, tx_stream, true); // future NORM API will add "bool watermark" option to graceful close
536 socket_state = CLOSING;
537 }
538 } // end NormSocket::Shutdown()
539
Close()540 void NormSocket::Close()
541 {
542 if (IsMulticastSocket())
543 {
544 if (IsServerSide())
545 {
546 if (IsServerSocket())
547 {
548 // IsMulticastSocket() guarantees the mcast_session is valid
549 // Dissociate remaining clients from this session and set their
550 // timer so that NORM_SOCKET_CLOSED events are dispatched for them
551 NormNodeId nodeId = NORM_NODE_NONE;
552 while (NormGetNextAckingNode(mcast_session, &nodeId))
553 {
554 NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, nodeId);
555 assert(NORM_NODE_INVALID != node);
556 NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(node);
557 NormSetUserTimer(clientSocket->norm_session, 0.0);
558 }
559 // for mcast server mcast_session == norm_session so it's destroyed below
560 }
561 else
562 {
563 // "IsServerSide()" guarantees the "server_socket" is non-NULL
564 // server-side multicast client socket closing, so we
565 // need to remove this "client" NormNodeId from the mcast
566 // session's acking node list
567 server_socket->RemoveAckingNode(client_id);
568 }
569 }
570 else // client-side multicast socket, so we need to destroy mcast_session, too
571 {
572 NormDestroySession(mcast_session);
573 }
574 mcast_session = NORM_SESSION_INVALID;
575 }
576 if (NORM_SESSION_INVALID != norm_session)
577 {
578 NormDestroySession(norm_session);
579 norm_session = NORM_SESSION_INVALID;
580 }
581 server_socket = NULL;
582 tx_stream = NORM_OBJECT_INVALID;
583 tx_segment_size = 0;
584 tx_stream_buffer_max = tx_stream_buffer_count = tx_stream_bytes_remain = 0;
585 tx_watermark_pending = false;
586 rx_stream = NORM_OBJECT_INVALID;
587 socket_state = CLOSED;
588 } // end NormSocket::Close()
589
590
GetSocketEvent(const NormEvent & event,NormSocketEvent & socketEvent)591 void NormSocket::GetSocketEvent(const NormEvent& event, NormSocketEvent& socketEvent)
592 {
593 socketEvent.socket = (NormSocketHandle)this;
594 socketEvent.type = NORM_SOCKET_NONE; // default socket event type if no socket-specific state change occurs
595 socketEvent.event = event;
596 switch (event.type)
597 {
598 case NORM_TX_QUEUE_EMPTY:
599 case NORM_TX_QUEUE_VACANCY:
600 {
601 // The socket may be tx ready, so issue a NORM_SOCKET_WRITE event
602 if (CONNECTED == socket_state)
603 {
604 if (!tx_ready)
605 {
606 tx_ready = true;
607 socketEvent.type = NORM_SOCKET_WRITE;
608 }
609 }
610 break;
611 }
612 case NORM_TX_WATERMARK_COMPLETED:
613 {
614 switch (socket_state)
615 {
616 case ACCEPTING:
617 {
618 // This only comes into play for the "confirmed connection"
619 // model for multicast sockets (not yet implemented)
620 assert(0);
621 assert(IsServerSide() && IsMulticastClient());
622 if (NORM_ACK_SUCCESS == NormGetAckingStatus(norm_session))
623 {
624 // Client has acknowledged our acceptance
625 socketEvent.type = NORM_SOCKET_CONNECT;
626 NormStopSender(norm_session); // the mcast_session is our tx channel
627 break;
628 }
629 else
630 {
631 // Client didn't acknowledge, so we cull him from our server
632 Close();
633 socketEvent.type = NORM_SOCKET_CLOSED;
634 }
635 break;
636 }
637 case CLOSING:
638 {
639 // Socket that was shutdown has either been acknowledged or timed out
640 // TBD - should we issue a different event if ACK_FAILURE???
641 Close();
642 socketEvent.type = NORM_SOCKET_CLOSED;
643 break;
644 }
645 default:
646 {
647 // TBD - implement option for more persistence
648 bool success = false;
649 if (NORM_ACK_SUCCESS == NormGetAckingStatus(norm_session))
650 {
651 success = true;
652 }
653 else
654 {
655 // At least one receiver didn't acknowledge
656 if (IsUnicastSocket() || IsMulticastClient())
657 {
658 // We could be infinitely persistent w/ NormResetWatermark()
659 //NormResetWatermark(event.session);
660 // For now, we'll just declare the connection broken/closed
661 Close();
662 socketEvent.type = NORM_SOCKET_CLOSED;
663 }
664 else
665 {
666 // Multicast server, so determine who failed to acknowledge
667 // and cull them from our acking node list ... and shutdown
668 // their associated unicast sockets ... ugh!!!
669 NormNodeId nodeId = NORM_NODE_NONE;
670 NormAckingStatus ackingStatus;
671 while (NormGetNextAckingNode(mcast_session, &nodeId, &ackingStatus))
672 {
673 if (NORM_ACK_SUCCESS == ackingStatus)
674 {
675 success = true; // there was at least one success
676 }
677 else
678 {
679 NormNodeHandle node = NormGetAckingNodeHandle(mcast_session, nodeId);
680 assert(NORM_NODE_INVALID != node);
681 NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(node);
682 assert(NULL != clientSocket);
683 // We use the session timer to dispatch a NORM_SOCKET_CLOSED per failed client
684 // (This will also remove the client from this server's acking list)
685 NormSetUserTimer(clientSocket->norm_session, 0.0);
686 clientSocket->socket_state = CLOSING;
687 }
688 }
689 // TBD - what do we if all clients failed ... issue a NORM_SOCKET_DISCONNECT event,
690 // probably stop sending data and resume when a new client appears ???
691 }
692 }
693 if (tx_watermark_pending && success)
694 {
695 // flow control acknowledgement
696 tx_watermark_pending = false;
697 tx_stream_buffer_count -= (tx_stream_buffer_max >> 1);
698 if (!tx_ready)
699 {
700 tx_ready = true;
701 socketEvent.type = NORM_SOCKET_WRITE;
702 }
703 }
704 break;
705 }
706 }
707 break;
708 }
709 case NORM_REMOTE_SENDER_RESET:
710 case NORM_REMOTE_SENDER_NEW:
711 {
712 switch (socket_state)
713 {
714 case LISTENING:
715 socketEvent.type = NORM_SOCKET_ACCEPT;
716 break;
717 case ACCEPTING:
718 if (IsServerSide() && IsClientSocket() && (NORM_NODE_INVALID != remote_node))
719 NormNodeDelete(remote_node);
720 case CONNECTING:
721 // TBD - We should validate that it's the right remote sender
722 // (i.e., by source address and/or nodeId)
723 NormCancelUserTimer(norm_session);
724 socketEvent.type = NORM_SOCKET_CONNECT;
725 socket_state = CONNECTED;
726 remote_node = event.sender;
727 break;
728 case CONNECTED:
729 if (IsMulticastSocket())
730 {
731 if (IsServerSocket())
732 {
733 // New client showing up at our multicast party
734 socketEvent.type = NORM_SOCKET_ACCEPT;
735 }
736 else
737 {
738 // Different sender showing up in multicast group!?
739 fprintf(stderr, "NormSocket warning: multicast sender %s reset?!\n", NormNodeGetAddressString(event.sender));
740 // TBD - should Close() the socket and issue a NORM_SOCKET_CLOSED event
741 // and leave it up to the application to reconnect? Or should we
742 // provides some sort of NORM_SOCKET_DISCONNECT event
743 socketEvent.type = NORM_SOCKET_CLOSED;
744 Close();
745 }
746 }
747 else // unicast
748 {
749 // Eemote sender reset? How do we tell?
750 fprintf(stderr, "NormSocket warning: unicast sender %s reset?!\n", NormNodeGetAddressString(event.sender));
751 socketEvent.type = NORM_SOCKET_CLOSED;
752 Close();
753 }
754 break;
755
756 default: // CLOSING, CLOSED
757 // shouldn't happen
758 break;
759 }
760 break;
761 }
762 case NORM_SEND_ERROR:
763 {
764 switch (socket_state)
765 {
766 case CONNECTING:
767 case ACCEPTING:
768 case CONNECTED:
769 case CLOSING:
770 if (IsMulticastServer())
771 fprintf(stderr, "SEND_ERROR on a multicast server socket?!\n");
772 socketEvent.event.sender = remote_node;
773 socketEvent.type = NORM_SOCKET_CLOSED;
774 Close();
775 break;
776 default:
777 // shouldn't happen
778 break;
779 }
780 break;
781 }
782
783 case NORM_USER_TIMEOUT:
784 {
785 switch (socket_state)
786 {
787 case CONNECTING: // client connection attempt timed out
788 case ACCEPTING: // accepted client didn't follow through
789 case CONNECTED: // multicast client ack failure
790 case CLOSING:
791 socketEvent.event.sender = remote_node;
792 socketEvent.type = NORM_SOCKET_CLOSED;
793 Close();
794 break;
795 default:
796 // shouldn't happen
797 assert(0);
798 break;
799 }
800 break;
801 }
802 case NORM_REMOTE_SENDER_INACTIVE:
803 {
804 switch (socket_state)
805 {
806 case LISTENING:
807 {
808 // delete state for remote sender that has been accepted (or not)
809 // TBD - do something a little more tidy here
810 NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(event.sender);
811 if ((NULL != clientSocket) && (clientSocket->remote_node == event.sender))
812 clientSocket->remote_node = NORM_NODE_INVALID;
813 NormNodeDelete(event.sender);
814 break;
815 }
816 case CONNECTED:
817 {
818 if (IsServerSocket())
819 {
820 NormSocket* clientSocket = (NormSocket*)NormNodeGetUserData(event.sender);
821 if ((NULL != clientSocket) && (clientSocket->remote_node == event.sender))
822 clientSocket->remote_node = NORM_NODE_INVALID;
823 NormNodeDelete(event.sender);
824 }
825 // TBD - should we do something here (perhaps issue a NORM_SOCKET_IDLE event or something
826 // that could be used as a clue that our "connection" may have broken or timed out???
827 // (Meanwhile, applications will have to figure that our for themselves)
828 break;
829 }
830 default: // CONNECTING, ACCEPTING, CLOSING, CLOSED
831 {
832 // shouldn't happen
833 break;
834 }
835 }
836 break;
837 }
838 case NORM_RX_OBJECT_NEW:
839 {
840 switch (socket_state)
841 {
842 case LISTENING:
843 // TBD - shouldn't happen, delete sender right away?
844 break;
845 case CONNECTED:
846 // TBD - make sure the sender is who we expect it to be???
847 if (IsServerSocket()) break;
848 if (NORM_OBJECT_INVALID == rx_stream)
849 {
850 // We're expecting this, new stream ready for reading ...
851 InitRxStream(event.object);
852 socketEvent.type = NORM_SOCKET_READ;
853 }
854 else
855 {
856 // Stream reset
857 fprintf(stderr, "NormSocket::GetSocketEvent(): client stream reset?!\n");
858 }
859 break;
860 default: // CONNECTING, ACCEPTING, CLOSING, CLOSED
861 // shouldn't happen
862 break;
863 }
864 break;
865 }
866 case NORM_RX_OBJECT_UPDATED:
867 {
868 switch (socket_state)
869 {
870 case CONNECTED:
871 case CLOSING: // we allow reading during graceful closure
872 // TBD - use an rx_ready indication to filter this event a little more
873 if (IsServerSocket()) break; // we don't receive data on server socket
874 assert(event.object == rx_stream);
875 socketEvent.type = NORM_SOCKET_READ;
876 break;
877 default:
878 // shouldn't happen
879 break;
880 }
881 break;
882 }
883 case NORM_RX_OBJECT_COMPLETED:
884 {
885 rx_stream = NORM_OBJECT_INVALID;
886 switch (socket_state)
887 {
888 case CONNECTED:
889 // Initiate graceful closure of our tx_stream to allow at least some time to
890 // acknowledge the remote before closing everything down
891 NormStreamClose(tx_stream, true); // Note our "trick" here to do a graceful close, _then_ watermark to get ack
892 NormSetWatermark(norm_session, tx_stream, true); // future NORM API will add "bool watermark" option to graceful close
893 socket_state = CLOSING;
894 socketEvent.type = NORM_SOCKET_CLOSING;
895 break;
896
897 case CLOSING:
898 // We're already closing, so just let that complete. This helps make sure we allow
899 // at least some time to acknowledge the remote before closing everything down
900 break;
901 default:
902 // shouldn't happen
903 break;
904 }
905 }
906 default:
907 break;
908 }
909 } // end NormSocket::GetSocketEvent()
910
911
912
913 ///////////////////////////////////////////////////////////////////////////////////
914 // NormSocket API implementation
915
916 // TBD - provide options for binding to a specific local address, interface, etc
NormListen(NormInstanceHandle instance,UINT16 serverPort,const char * groupAddr)917 NormSocketHandle NormListen(NormInstanceHandle instance, UINT16 serverPort, const char* groupAddr)
918 {
919 // TBD - check results
920 NormSocket* normSocket = new NormSocket();
921 normSocket->Listen(instance, serverPort, groupAddr);
922 return (NormSocketHandle)normSocket;
923 } // end NormListen()
924
925
NormAccept(NormSocketHandle serverSocket,NormNodeHandle client)926 NormSocketHandle NormAccept(NormSocketHandle serverSocket, NormNodeHandle client)
927 {
928 // TBD - VALIDATE PARAMETERS AND ERROR CHECK ALL THE API CALLS MADE HERE !!!!!
929 NormSocket* s = (NormSocket*)serverSocket;
930 return (NormSocketHandle)(s->Accept(client));
931 } // end NormAccept()
932
933
934 // TBD - provide options for binding to a specific local address, interface, etc
NormConnect(NormInstanceHandle instance,const char * serverAddr,UINT16 serverPort,const char * groupAddr,NormNodeId clientId)935 NormSocketHandle NormConnect(NormInstanceHandle instance, const char* serverAddr, UINT16 serverPort, const char* groupAddr, NormNodeId clientId)
936 {
937 NormSocket* normSocket = new NormSocket();
938 if (NULL == normSocket)
939 {
940 perror("NormConnect() new NormSocket() error");
941 return NULL;
942 }
943 if (normSocket->Connect(instance, serverAddr, serverPort, groupAddr, clientId))
944 {
945 return normSocket;
946 }
947 else
948 {
949 delete normSocket;
950 return NULL;
951 }
952 } // end NormConnect()
953
954
NormWrite(NormSocketHandle normSocket,const void * buf,size_t nbyte)955 ssize_t NormWrite(NormSocketHandle normSocket, const void *buf, size_t nbyte)
956 {
957 // TBD - we could make write() and read() optionally blocking or non-blocking
958 // by using GetSocketEvent() as appropriate (incl. returning error conditions, etc)
959 NormSocket* s = (NormSocket*)normSocket;
960 return (ssize_t)s->Write((const char*)buf, nbyte);
961 } // end NormWrite()
962
NormFlush(NormSocketHandle normSocket)963 int NormFlush(NormSocketHandle normSocket)
964 {
965 NormSocket* s = (NormSocket*)normSocket;
966 s->Flush();
967 return 0;
968 } // end NormFlush()
969
NormRead(NormSocketHandle normSocket,void * buf,size_t nbyte)970 ssize_t NormRead(NormSocketHandle normSocket, void *buf, size_t nbyte)
971 {
972 // TBD - we could make write() and read() optionally blocking or non-blocking
973 // by using GetSocketEvent() as appropriate (incl. returning error conditions, etc)
974 NormSocket* s = (NormSocket*)normSocket;
975 // TBD - make sure s->rx_stream is valid
976 unsigned int numBytes = nbyte;
977 if (s->Read((char*)buf, numBytes))
978 return numBytes;
979 else
980 return -1; // broken stream error (TBD - enumerate socket error values)
981 } // end NormWrite()
982
983
NormShutdown(NormSocketHandle normSocket)984 void NormShutdown(NormSocketHandle normSocket)
985 {
986 NormSocket* s = (NormSocket*)normSocket;
987 s->Shutdown();
988 } // end NormShutdown()
989
NormClose(NormSocketHandle normSocket)990 void NormClose(NormSocketHandle normSocket)
991 {
992 NormSocket* s = (NormSocket*)normSocket;
993 s->Close();
994 } // end NormClose()
995
996 // This gets and translates low level NORM API events to NormSocket events
997 // given the "normSocket" state
NormGetSocketEvent(NormInstanceHandle instance,NormSocketEvent * socketEvent,bool waitForEvent)998 bool NormGetSocketEvent(NormInstanceHandle instance, NormSocketEvent* socketEvent, bool waitForEvent)
999 {
1000 if (NULL == socketEvent) return false;
1001 NormEvent event;
1002 if (NormGetNextEvent(instance, &event, waitForEvent))
1003 {
1004 NormSocket* normSocket = NULL;
1005 if (NORM_SESSION_INVALID != event.session)
1006 normSocket = (NormSocket*)NormGetUserData(event.session);
1007 if (NULL == normSocket)
1008 {
1009 socketEvent->type = NORM_SOCKET_NONE;
1010 socketEvent->socket = NORM_SOCKET_INVALID;
1011 socketEvent->event = event;
1012 }
1013 else
1014 {
1015 normSocket->GetSocketEvent(event, *socketEvent);
1016 }
1017 return true;
1018 }
1019 else
1020 {
1021 return false;
1022 }
1023 } // end NormGetSocketEvent()
1024
1025 // Other helper functions
1026
NormGetSession(NormSocketHandle normSocket)1027 NormSessionHandle NormGetSession(NormSocketHandle normSocket)
1028 {
1029 NormSocket* s = (NormSocket*)normSocket;
1030 return s->GetSession();
1031 } // end NormGetSession()
1032
NormGetMulticastSession(NormSocketHandle normSocket)1033 NormSessionHandle NormGetMulticastSession(NormSocketHandle normSocket)
1034 {
1035 NormSocket* s = (NormSocket*)normSocket;
1036 return s->GetMulticastSession();
1037 } // end NormGetSession()
1038