1 /**********
2 This library is free software; you can redistribute it and/or modify it under
3 the terms of the GNU Lesser General Public License as published by the
4 Free Software Foundation; either version 3 of the License, or (at your
5 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6 
7 This library is distributed in the hope that it will be useful, but WITHOUT
8 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
10 more details.
11 
12 You should have received a copy of the GNU Lesser General Public License
13 along with this library; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
15 **********/
16 // "liveMedia"
17 // Copyright (c) 1996-2020 Live Networks, Inc.  All rights reserved.
18 // An abstraction of a network interface used for RTP (or RTCP).
19 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
20 // be implemented transparently.)
21 // Implementation
22 
23 #include "RTPInterface.hh"
24 #include <GroupsockHelper.hh>
25 #include <stdio.h>
26 
27 ////////// Helper Functions - Definition //////////
28 
29 // Helper routines and data structures, used to implement
30 // sending/receiving RTP/RTCP over a TCP socket:
31 
32 class tcpStreamRecord {
33   public:
34   tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
35                   tcpStreamRecord* next);
36   virtual ~tcpStreamRecord();
37 
38 public:
39   tcpStreamRecord* fNext;
40   int fStreamSocketNum;
41   unsigned char fStreamChannelId;
42 };
43 
44 // Reading RTP-over-TCP is implemented using two levels of hash tables.
45 // The top-level hash table maps TCP socket numbers to a
46 // "SocketDescriptor" that contains a hash table for each of the
47 // sub-channels that are reading from this socket.
48 
socketHashTable(UsageEnvironment & env,Boolean createIfNotPresent=True)49 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
50   _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
51   if (ourTables == NULL) return NULL;
52 
53   if (ourTables->socketTable == NULL) {
54     // Create a new socket number -> SocketDescriptor mapping table:
55     ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
56   }
57   return (HashTable*)(ourTables->socketTable);
58 }
59 
60 class SocketDescriptor {
61 public:
62   SocketDescriptor(UsageEnvironment& env, int socketNum);
63   virtual ~SocketDescriptor();
64 
65   void registerRTPInterface(unsigned char streamChannelId,
66 			    RTPInterface* rtpInterface);
67   RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
68   void deregisterRTPInterface(unsigned char streamChannelId);
69 
setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler * handler,void * clientData)70   void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
71     fServerRequestAlternativeByteHandler = handler;
72     fServerRequestAlternativeByteHandlerClientData = clientData;
73   }
74 
75 private:
76   static void tcpReadHandler(SocketDescriptor*, int mask);
77   Boolean tcpReadHandler1(int mask);
78 
79 private:
80   UsageEnvironment& fEnv;
81   int fOurSocketNum;
82   HashTable* fSubChannelHashTable;
83   ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
84   void* fServerRequestAlternativeByteHandlerClientData;
85   u_int8_t fStreamChannelId, fSizeByte1;
86   Boolean fReadErrorOccurred, fDeleteMyselfNext, fAreInReadHandlerLoop;
87   enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
88 };
89 
lookupSocketDescriptor(UsageEnvironment & env,int sockNum,Boolean createIfNotFound=True)90 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
91   HashTable* table = socketHashTable(env, createIfNotFound);
92   if (table == NULL) return NULL;
93 
94   char const* key = (char const*)(long)sockNum;
95   SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
96   if (socketDescriptor == NULL) {
97     if (createIfNotFound) {
98       socketDescriptor = new SocketDescriptor(env, sockNum);
99       table->Add((char const*)(long)(sockNum), socketDescriptor);
100     } else if (table->IsEmpty()) {
101       // We can also delete the table (to reclaim space):
102       _Tables* ourTables = _Tables::getOurTables(env);
103       delete table;
104       ourTables->socketTable = NULL;
105       ourTables->reclaimIfPossible();
106     }
107   }
108 
109   return socketDescriptor;
110 }
111 
removeSocketDescription(UsageEnvironment & env,int sockNum)112 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
113   char const* key = (char const*)(long)sockNum;
114   HashTable* table = socketHashTable(env);
115   table->Remove(key);
116 
117   if (table->IsEmpty()) {
118     // We can also delete the table (to reclaim space):
119     _Tables* ourTables = _Tables::getOurTables(env);
120     delete table;
121     ourTables->socketTable = NULL;
122     ourTables->reclaimIfPossible();
123   }
124 }
125 
126 
127 ////////// RTPInterface - Implementation //////////
128 
RTPInterface(Medium * owner,Groupsock * gs)129 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
130   : fOwner(owner), fGS(gs),
131     fTCPStreams(NULL),
132     fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
133     fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
134     fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
135   // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
136   // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
137   // even if the socket was previously reported (e.g., by "select()") as having data available.
138   // (This can supposedly happen if the UDP checksum fails, for example.)
139   makeSocketNonBlocking(fGS->socketNum());
140   increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
141 }
142 
~RTPInterface()143 RTPInterface::~RTPInterface() {
144   stopNetworkReading();
145   delete fTCPStreams;
146 }
147 
setStreamSocket(int sockNum,unsigned char streamChannelId)148 void RTPInterface::setStreamSocket(int sockNum,
149 				   unsigned char streamChannelId) {
150   fGS->removeAllDestinations();
151   envir().taskScheduler().disableBackgroundHandling(fGS->socketNum()); // turn off any reading on our datagram socket
152   fGS->reset(); // and close our datagram socket, because we won't be using it anymore
153 
154   addStreamSocket(sockNum, streamChannelId);
155 }
156 
addStreamSocket(int sockNum,unsigned char streamChannelId)157 void RTPInterface::addStreamSocket(int sockNum,
158 				   unsigned char streamChannelId) {
159   if (sockNum < 0) return;
160 
161   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
162        streams = streams->fNext) {
163     if (streams->fStreamSocketNum == sockNum
164 	&& streams->fStreamChannelId == streamChannelId) {
165       return; // we already have it
166     }
167   }
168 
169   fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
170 
171   // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP:
172   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
173   socketDescriptor->registerRTPInterface(streamChannelId, this);
174 }
175 
deregisterSocket(UsageEnvironment & env,int sockNum,unsigned char streamChannelId)176 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
177   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
178   if (socketDescriptor != NULL) {
179     socketDescriptor->deregisterRTPInterface(streamChannelId);
180         // Note: This may delete "socketDescriptor",
181         // if no more interfaces are using this socket
182   }
183 }
184 
removeStreamSocket(int sockNum,unsigned char streamChannelId)185 void RTPInterface::removeStreamSocket(int sockNum,
186 				      unsigned char streamChannelId) {
187   // Remove - from our list of 'TCP streams' - the record of the (sockNum,streamChannelId) pair.
188   // (However "streamChannelId" == 0xFF is a special case, meaning remove all
189   //  (sockNum,*) pairs.)
190 
191   while (1) {
192     tcpStreamRecord** streamsPtr = &fTCPStreams;
193 
194     while (*streamsPtr != NULL) {
195       if ((*streamsPtr)->fStreamSocketNum == sockNum
196 	  && (streamChannelId == 0xFF || streamChannelId == (*streamsPtr)->fStreamChannelId)) {
197 	// Delete the record pointed to by *streamsPtr :
198 	unsigned char streamChannelIdToRemove = (*streamsPtr)->fStreamChannelId;
199 	tcpStreamRecord* next = (*streamsPtr)->fNext;
200 	(*streamsPtr)->fNext = NULL;
201 	delete (*streamsPtr);
202 	*streamsPtr = next;
203 
204 	// And 'deregister' this socket,channelId pair:
205 	deregisterSocket(envir(), sockNum, streamChannelIdToRemove);
206 
207 	if (streamChannelId != 0xFF) return; // we're done
208 	break; // start again from the beginning of the list, in case the list has changed
209       } else {
210 	streamsPtr = &((*streamsPtr)->fNext);
211       }
212     }
213     if (*streamsPtr == NULL) break;
214   }
215 }
216 
setServerRequestAlternativeByteHandler(UsageEnvironment & env,int socketNum,ServerRequestAlternativeByteHandler * handler,void * clientData)217 void RTPInterface::setServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum,
218 							  ServerRequestAlternativeByteHandler* handler, void* clientData) {
219   SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, socketNum, False);
220 
221   if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
222 }
223 
clearServerRequestAlternativeByteHandler(UsageEnvironment & env,int socketNum)224 void RTPInterface::clearServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum) {
225   setServerRequestAlternativeByteHandler(env, socketNum, NULL, NULL);
226 }
227 
sendPacket(unsigned char * packet,unsigned packetSize)228 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
229   Boolean success = True; // we'll return False instead if any of the sends fail
230 
231   // Normal case: Send as a UDP packet:
232   if (!fGS->output(envir(), packet, packetSize)) success = False;
233 
234   // Also, send over each of our TCP sockets:
235   tcpStreamRecord* nextStream;
236   for (tcpStreamRecord* stream = fTCPStreams; stream != NULL; stream = nextStream) {
237     nextStream = stream->fNext; // Set this now, in case the following deletes "stream":
238     if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
239 				    stream->fStreamSocketNum, stream->fStreamChannelId)) {
240       success = False;
241     }
242   }
243 
244   return success;
245 }
246 
247 void RTPInterface
startNetworkReading(TaskScheduler::BackgroundHandlerProc * handlerProc)248 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
249   // Normal case: Arrange to read UDP packets:
250   envir().taskScheduler().
251     turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
252 
253   // Also, receive RTP over TCP, on each of our TCP connections:
254   fReadHandlerProc = handlerProc;
255   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
256        streams = streams->fNext) {
257     // Get a socket descriptor for "streams->fStreamSocketNum":
258     SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
259 
260     // Tell it about our subChannel:
261     socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
262   }
263 }
264 
handleRead(unsigned char * buffer,unsigned bufferMaxSize,unsigned & bytesRead,struct sockaddr_storage & fromAddress,int & tcpSocketNum,unsigned char & tcpStreamChannelId,Boolean & packetReadWasIncomplete)265 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
266 				 unsigned& bytesRead, struct sockaddr_storage& fromAddress,
267 				 int& tcpSocketNum, unsigned char& tcpStreamChannelId,
268 				 Boolean& packetReadWasIncomplete) {
269   packetReadWasIncomplete = False; // by default
270   Boolean readSuccess;
271   if (fNextTCPReadStreamSocketNum < 0) {
272     // Normal case: read from the (datagram) 'groupsock':
273     tcpSocketNum = -1;
274     readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
275   } else {
276     // Read from the TCP connection:
277     tcpSocketNum = fNextTCPReadStreamSocketNum;
278     tcpStreamChannelId = fNextTCPReadStreamChannelId;
279 
280     bytesRead = 0;
281     unsigned totBytesToRead = fNextTCPReadSize;
282     if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
283     unsigned curBytesToRead = totBytesToRead;
284     int curBytesRead;
285     while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
286 				      &buffer[bytesRead], curBytesToRead,
287 				      fromAddress)) > 0) {
288       bytesRead += curBytesRead;
289       if (bytesRead >= totBytesToRead) break;
290       curBytesToRead -= curBytesRead;
291     }
292     fNextTCPReadSize -= bytesRead;
293     if (fNextTCPReadSize == 0) {
294       // We've read all of the data that we asked for
295       readSuccess = True;
296     } else if (curBytesRead < 0) {
297       // There was an error reading the socket
298       bytesRead = 0;
299       readSuccess = False;
300     } else {
301       // We need to read more bytes, and there was not an error reading the socket
302       packetReadWasIncomplete = True;
303       return True;
304     }
305     fNextTCPReadStreamSocketNum = -1; // default, for next time
306   }
307 
308   if (readSuccess && fAuxReadHandlerFunc != NULL) {
309     // Also pass the newly-read packet data to our auxilliary handler:
310     (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
311   }
312   return readSuccess;
313 }
314 
stopNetworkReading()315 void RTPInterface::stopNetworkReading() {
316   // Normal case
317   if (fGS != NULL) envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
318 
319   // Also turn off read handling on each of our TCP connections:
320   for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {
321     deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
322   }
323 }
324 
325 
326 ////////// Helper Functions - Implementation /////////
327 
sendRTPorRTCPPacketOverTCP(u_int8_t * packet,unsigned packetSize,int socketNum,unsigned char streamChannelId)328 Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
329 						 int socketNum, unsigned char streamChannelId) {
330 #ifdef DEBUG_SEND
331   fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
332 	  packetSize, streamChannelId, socketNum); fflush(stderr);
333 #endif
334   // Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
335   //     $<streamChannelId><packetSize><packet>
336   // (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force
337   // the subsequent "send()" for the <packet> data to succeed, even if we have to do so with
338   // a blocking "send()".)
339   do {
340     u_int8_t framingHeader[4];
341     framingHeader[0] = '$';
342     framingHeader[1] = streamChannelId;
343     framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
344     framingHeader[3] = (u_int8_t) (packetSize&0xFF);
345     if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
346 
347     if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
348 #ifdef DEBUG_SEND
349     fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
350 #endif
351 
352     return True;
353   } while (0);
354 
355 #ifdef DEBUG_SEND
356   fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
357 #endif
358   return False;
359 }
360 
361 #ifndef RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS
362 #define RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS 500
363 #endif
364 
sendDataOverTCP(int socketNum,u_int8_t const * data,unsigned dataSize,Boolean forceSendToSucceed)365 Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
366   int sendResult = send(socketNum, (char const*)data, dataSize, 0/*flags*/);
367   if (sendResult < (int)dataSize) {
368     // The TCP send() failed - at least partially.
369 
370     unsigned numBytesSentSoFar = sendResult < 0 ? 0 : (unsigned)sendResult;
371     if (numBytesSentSoFar > 0 || (forceSendToSucceed && envir().getErrno() == EAGAIN)) {
372       // The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded
373       // the capacity of the TCP connection!).
374       // Force this data write to succeed, by blocking if necessary until it does:
375       unsigned numBytesRemainingToSend = dataSize - numBytesSentSoFar;
376 #ifdef DEBUG_SEND
377       fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", numBytesRemainingToSend); fflush(stderr);
378 #endif
379       makeSocketBlocking(socketNum, RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS);
380       sendResult = send(socketNum, (char const*)(&data[numBytesSentSoFar]), numBytesRemainingToSend, 0/*flags*/);
381       if ((unsigned)sendResult != numBytesRemainingToSend) {
382 	// The blocking "send()" failed, or timed out.  In either case, we assume that the
383 	// TCP connection has failed (or is 'hanging' indefinitely), and we stop using it
384 	// (for both RTP and RTP).
385 	// (If we kept using the socket here, the RTP or RTCP packet write would be in an
386 	//  incomplete, inconsistent state.)
387 #ifdef DEBUG_SEND
388 	fprintf(stderr, "sendDataOverTCP: blocking send() failed (delivering %d bytes out of %d); closing socket %d\n", sendResult, numBytesRemainingToSend, socketNum); fflush(stderr);
389 #endif
390 	removeStreamSocket(socketNum, 0xFF);
391 	return False;
392       }
393       makeSocketNonBlocking(socketNum);
394 
395       return True;
396     } else if (sendResult < 0 && envir().getErrno() != EAGAIN) {
397       // Because the "send()" call failed, assume that the socket is now unusable, so stop
398       // using it (for both RTP and RTCP):
399       removeStreamSocket(socketNum, 0xFF);
400     }
401 
402     return False;
403   }
404 
405   return True;
406 }
407 
SocketDescriptor(UsageEnvironment & env,int socketNum)408 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
409   :fEnv(env), fOurSocketNum(socketNum),
410     fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
411    fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
412    fReadErrorOccurred(False), fDeleteMyselfNext(False), fAreInReadHandlerLoop(False), fTCPReadingState(AWAITING_DOLLAR) {
413 }
414 
~SocketDescriptor()415 SocketDescriptor::~SocketDescriptor() {
416   fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
417   removeSocketDescription(fEnv, fOurSocketNum);
418 
419   if (fSubChannelHashTable != NULL) {
420     // Remove knowledge of this socket from any "RTPInterface"s that are using it:
421     HashTable::Iterator* iter = HashTable::Iterator::create(*fSubChannelHashTable);
422     RTPInterface* rtpInterface;
423     char const* key;
424 
425     while ((rtpInterface = (RTPInterface*)(iter->next(key))) != NULL) {
426       u_int64_t streamChannelIdLong = (u_int64_t)key;
427       unsigned char streamChannelId = (unsigned char)streamChannelIdLong;
428 
429       rtpInterface->removeStreamSocket(fOurSocketNum, streamChannelId);
430     }
431     delete iter;
432 
433     // Then remove the hash table entries themselves, and then remove the hash table:
434     while (fSubChannelHashTable->RemoveNext() != NULL) {}
435     delete fSubChannelHashTable;
436   }
437 
438   // Finally:
439   if (fServerRequestAlternativeByteHandler != NULL) {
440     // Hack: Pass a special character to our alternative byte handler, to tell it that either
441     // - an error occurred when reading the TCP socket, or
442     // - no error occurred, but it needs to take over control of the TCP socket once again.
443     u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE;
444     (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar);
445   }
446 }
447 
registerRTPInterface(unsigned char streamChannelId,RTPInterface * rtpInterface)448 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
449 					    RTPInterface* rtpInterface) {
450   Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
451 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
452   fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration);
453 #endif
454   fSubChannelHashTable->Add((char const*)(long)streamChannelId,
455 			    rtpInterface);
456 
457   if (isFirstRegistration) {
458     // Arrange to handle reads on this TCP socket:
459     TaskScheduler::BackgroundHandlerProc* handler
460       = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
461     fEnv.taskScheduler().
462       setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this);
463   }
464 }
465 
466 RTPInterface* SocketDescriptor
lookupRTPInterface(unsigned char streamChannelId)467 ::lookupRTPInterface(unsigned char streamChannelId) {
468   char const* lookupArg = (char const*)(long)streamChannelId;
469   return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
470 }
471 
472 void SocketDescriptor
deregisterRTPInterface(unsigned char streamChannelId)473 ::deregisterRTPInterface(unsigned char streamChannelId) {
474 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
475   fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId);
476 #endif
477   fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
478 
479   if (fSubChannelHashTable->IsEmpty()) {
480     // No more interfaces are using us, so it's curtains for us now:
481     if (fAreInReadHandlerLoop) {
482       fDeleteMyselfNext = True; // we can't delete ourself yet, but we'll do so from "tcpReadHandler()" below
483     } else {
484       delete this;
485     }
486   }
487 }
488 
tcpReadHandler(SocketDescriptor * socketDescriptor,int mask)489 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
490   // Call the read handler until it returns false, with a limit to avoid starving other sockets
491   unsigned count = 2000;
492   socketDescriptor->fAreInReadHandlerLoop = True;
493   while (!socketDescriptor->fDeleteMyselfNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {}
494   socketDescriptor->fAreInReadHandlerLoop = False;
495   if (socketDescriptor->fDeleteMyselfNext) delete socketDescriptor;
496 }
497 
tcpReadHandler1(int mask)498 Boolean SocketDescriptor::tcpReadHandler1(int mask) {
499   // We expect the following data over the TCP channel:
500   //   optional RTSP command or response bytes (before the first '$' character)
501   //   a '$' character
502   //   a 1-byte channel id
503   //   a 2-byte packet size (in network byte order)
504   //   the packet data.
505   // However, because the socket is being read asynchronously, this data might arrive in pieces.
506 
507   u_int8_t c;
508   struct sockaddr_storage fromAddress;
509   if (fTCPReadingState != AWAITING_PACKET_DATA) {
510     int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
511     if (result == 0) { // There was no more data to read
512       return False;
513     } else if (result != 1) { // error reading TCP socket, so we will no longer handle it
514 #ifdef DEBUG_RECEIVE
515       fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
516 #endif
517       fReadErrorOccurred = True;
518       fDeleteMyselfNext = True;
519       return False;
520     }
521   }
522 
523   Boolean callAgain = True;
524   switch (fTCPReadingState) {
525     case AWAITING_DOLLAR: {
526       if (c == '$') {
527 #ifdef DEBUG_RECEIVE
528 	fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum);
529 #endif
530 	fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
531       } else {
532 	// This character is part of a RTSP request or command, which is handled separately:
533 	if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) {
534 	  // Hack: 0xFF and 0xFE are used as special signaling characters, so don't send them
535 	  (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
536 	}
537       }
538       break;
539     }
540     case AWAITING_STREAM_CHANNEL_ID: {
541       // The byte that we read is the stream channel id.
542       if (lookupRTPInterface(c) != NULL) { // sanity check
543 	fStreamChannelId = c;
544 	fTCPReadingState = AWAITING_SIZE1;
545       } else {
546 	// This wasn't a stream channel id that we expected.  We're (somehow) in a strange state.  Try to recover:
547 #ifdef DEBUG_RECEIVE
548 	fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c);
549 #endif
550 	fTCPReadingState = AWAITING_DOLLAR;
551       }
552       break;
553     }
554     case AWAITING_SIZE1: {
555       // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
556       fSizeByte1 = c;
557       fTCPReadingState = AWAITING_SIZE2;
558       break;
559     }
560     case AWAITING_SIZE2: {
561       // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
562       unsigned short size = (fSizeByte1<<8)|c;
563 
564       // Record the information about the packet data that will be read next:
565       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
566       if (rtpInterface != NULL) {
567 	rtpInterface->fNextTCPReadSize = size;
568 	rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
569 	rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
570       }
571       fTCPReadingState = AWAITING_PACKET_DATA;
572       break;
573     }
574     case AWAITING_PACKET_DATA: {
575       callAgain = False;
576       fTCPReadingState = AWAITING_DOLLAR; // the next state, unless we end up having to read more data in the current state
577       // Call the appropriate read handler to get the packet data from the TCP stream:
578       RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
579       if (rtpInterface != NULL) {
580 	if (rtpInterface->fNextTCPReadSize == 0) {
581 	  // We've already read all the data for this packet.
582 	  break;
583 	}
584 	if (rtpInterface->fReadHandlerProc != NULL) {
585 #ifdef DEBUG_RECEIVE
586 	  fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
587 #endif
588 	  fTCPReadingState = AWAITING_PACKET_DATA;
589 	  rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
590 	} else {
591 #ifdef DEBUG_RECEIVE
592 	  fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize);
593 #endif
594 	  int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
595 	  if (result < 0) { // error reading TCP socket, so we will no longer handle it
596 #ifdef DEBUG_RECEIVE
597 	    fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
598 #endif
599 	    fReadErrorOccurred = True;
600 	    fDeleteMyselfNext = True;
601 	    return False;
602 	  } else {
603 	    fTCPReadingState = AWAITING_PACKET_DATA;
604 	    if (result == 1) {
605 	      --rtpInterface->fNextTCPReadSize;
606 	      callAgain = True;
607 	    }
608 	  }
609 	}
610       }
611 #ifdef DEBUG_RECEIVE
612       else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId);
613 #endif
614     }
615   }
616 
617   return callAgain;
618 }
619 
620 
621 ////////// tcpStreamRecord implementation //////////
622 
623 tcpStreamRecord
tcpStreamRecord(int streamSocketNum,unsigned char streamChannelId,tcpStreamRecord * next)624 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
625 		  tcpStreamRecord* next)
626   : fNext(next),
627     fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
628 }
629 
~tcpStreamRecord()630 tcpStreamRecord::~tcpStreamRecord() {
631   delete fNext;
632 }
633