1 /**********
2 This library is free software; you can redistribute it and/or modify it under
3 the terms of the GNU Lesser General Public License as published by the
4 Free Software Foundation; either version 3 of the License, or (at your
5 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6
7 This library is distributed in the hope that it will be useful, but WITHOUT
8 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9 FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for
10 more details.
11
12 You should have received a copy of the GNU Lesser General Public License
13 along with this library; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
15 **********/
16 // "liveMedia"
17 // Copyright (c) 1996-2020 Live Networks, Inc. All rights reserved.
18 // An abstraction of a network interface used for RTP (or RTCP).
19 // (This allows the RTP-over-TCP hack (RFC 2326, section 10.12) to
20 // be implemented transparently.)
21 // Implementation
22
23 #include "RTPInterface.hh"
24 #include <GroupsockHelper.hh>
25 #include <stdio.h>
26
27 ////////// Helper Functions - Definition //////////
28
29 // Helper routines and data structures, used to implement
30 // sending/receiving RTP/RTCP over a TCP socket:
31
32 class tcpStreamRecord {
33 public:
34 tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
35 tcpStreamRecord* next);
36 virtual ~tcpStreamRecord();
37
38 public:
39 tcpStreamRecord* fNext;
40 int fStreamSocketNum;
41 unsigned char fStreamChannelId;
42 };
43
44 // Reading RTP-over-TCP is implemented using two levels of hash tables.
45 // The top-level hash table maps TCP socket numbers to a
46 // "SocketDescriptor" that contains a hash table for each of the
47 // sub-channels that are reading from this socket.
48
socketHashTable(UsageEnvironment & env,Boolean createIfNotPresent=True)49 static HashTable* socketHashTable(UsageEnvironment& env, Boolean createIfNotPresent = True) {
50 _Tables* ourTables = _Tables::getOurTables(env, createIfNotPresent);
51 if (ourTables == NULL) return NULL;
52
53 if (ourTables->socketTable == NULL) {
54 // Create a new socket number -> SocketDescriptor mapping table:
55 ourTables->socketTable = HashTable::create(ONE_WORD_HASH_KEYS);
56 }
57 return (HashTable*)(ourTables->socketTable);
58 }
59
60 class SocketDescriptor {
61 public:
62 SocketDescriptor(UsageEnvironment& env, int socketNum);
63 virtual ~SocketDescriptor();
64
65 void registerRTPInterface(unsigned char streamChannelId,
66 RTPInterface* rtpInterface);
67 RTPInterface* lookupRTPInterface(unsigned char streamChannelId);
68 void deregisterRTPInterface(unsigned char streamChannelId);
69
setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler * handler,void * clientData)70 void setServerRequestAlternativeByteHandler(ServerRequestAlternativeByteHandler* handler, void* clientData) {
71 fServerRequestAlternativeByteHandler = handler;
72 fServerRequestAlternativeByteHandlerClientData = clientData;
73 }
74
75 private:
76 static void tcpReadHandler(SocketDescriptor*, int mask);
77 Boolean tcpReadHandler1(int mask);
78
79 private:
80 UsageEnvironment& fEnv;
81 int fOurSocketNum;
82 HashTable* fSubChannelHashTable;
83 ServerRequestAlternativeByteHandler* fServerRequestAlternativeByteHandler;
84 void* fServerRequestAlternativeByteHandlerClientData;
85 u_int8_t fStreamChannelId, fSizeByte1;
86 Boolean fReadErrorOccurred, fDeleteMyselfNext, fAreInReadHandlerLoop;
87 enum { AWAITING_DOLLAR, AWAITING_STREAM_CHANNEL_ID, AWAITING_SIZE1, AWAITING_SIZE2, AWAITING_PACKET_DATA } fTCPReadingState;
88 };
89
lookupSocketDescriptor(UsageEnvironment & env,int sockNum,Boolean createIfNotFound=True)90 static SocketDescriptor* lookupSocketDescriptor(UsageEnvironment& env, int sockNum, Boolean createIfNotFound = True) {
91 HashTable* table = socketHashTable(env, createIfNotFound);
92 if (table == NULL) return NULL;
93
94 char const* key = (char const*)(long)sockNum;
95 SocketDescriptor* socketDescriptor = (SocketDescriptor*)(table->Lookup(key));
96 if (socketDescriptor == NULL) {
97 if (createIfNotFound) {
98 socketDescriptor = new SocketDescriptor(env, sockNum);
99 table->Add((char const*)(long)(sockNum), socketDescriptor);
100 } else if (table->IsEmpty()) {
101 // We can also delete the table (to reclaim space):
102 _Tables* ourTables = _Tables::getOurTables(env);
103 delete table;
104 ourTables->socketTable = NULL;
105 ourTables->reclaimIfPossible();
106 }
107 }
108
109 return socketDescriptor;
110 }
111
removeSocketDescription(UsageEnvironment & env,int sockNum)112 static void removeSocketDescription(UsageEnvironment& env, int sockNum) {
113 char const* key = (char const*)(long)sockNum;
114 HashTable* table = socketHashTable(env);
115 table->Remove(key);
116
117 if (table->IsEmpty()) {
118 // We can also delete the table (to reclaim space):
119 _Tables* ourTables = _Tables::getOurTables(env);
120 delete table;
121 ourTables->socketTable = NULL;
122 ourTables->reclaimIfPossible();
123 }
124 }
125
126
127 ////////// RTPInterface - Implementation //////////
128
RTPInterface(Medium * owner,Groupsock * gs)129 RTPInterface::RTPInterface(Medium* owner, Groupsock* gs)
130 : fOwner(owner), fGS(gs),
131 fTCPStreams(NULL),
132 fNextTCPReadSize(0), fNextTCPReadStreamSocketNum(-1),
133 fNextTCPReadStreamChannelId(0xFF), fReadHandlerProc(NULL),
134 fAuxReadHandlerFunc(NULL), fAuxReadHandlerClientData(NULL) {
135 // Make the socket non-blocking, even though it will be read from only asynchronously, when packets arrive.
136 // The reason for this is that, in some OSs, reads on a blocking socket can (allegedly) sometimes block,
137 // even if the socket was previously reported (e.g., by "select()") as having data available.
138 // (This can supposedly happen if the UDP checksum fails, for example.)
139 makeSocketNonBlocking(fGS->socketNum());
140 increaseSendBufferTo(envir(), fGS->socketNum(), 50*1024);
141 }
142
~RTPInterface()143 RTPInterface::~RTPInterface() {
144 stopNetworkReading();
145 delete fTCPStreams;
146 }
147
setStreamSocket(int sockNum,unsigned char streamChannelId)148 void RTPInterface::setStreamSocket(int sockNum,
149 unsigned char streamChannelId) {
150 fGS->removeAllDestinations();
151 envir().taskScheduler().disableBackgroundHandling(fGS->socketNum()); // turn off any reading on our datagram socket
152 fGS->reset(); // and close our datagram socket, because we won't be using it anymore
153
154 addStreamSocket(sockNum, streamChannelId);
155 }
156
addStreamSocket(int sockNum,unsigned char streamChannelId)157 void RTPInterface::addStreamSocket(int sockNum,
158 unsigned char streamChannelId) {
159 if (sockNum < 0) return;
160
161 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
162 streams = streams->fNext) {
163 if (streams->fStreamSocketNum == sockNum
164 && streams->fStreamChannelId == streamChannelId) {
165 return; // we already have it
166 }
167 }
168
169 fTCPStreams = new tcpStreamRecord(sockNum, streamChannelId, fTCPStreams);
170
171 // Also, make sure this new socket is set up for receiving RTP/RTCP-over-TCP:
172 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), sockNum);
173 socketDescriptor->registerRTPInterface(streamChannelId, this);
174 }
175
deregisterSocket(UsageEnvironment & env,int sockNum,unsigned char streamChannelId)176 static void deregisterSocket(UsageEnvironment& env, int sockNum, unsigned char streamChannelId) {
177 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, sockNum, False);
178 if (socketDescriptor != NULL) {
179 socketDescriptor->deregisterRTPInterface(streamChannelId);
180 // Note: This may delete "socketDescriptor",
181 // if no more interfaces are using this socket
182 }
183 }
184
removeStreamSocket(int sockNum,unsigned char streamChannelId)185 void RTPInterface::removeStreamSocket(int sockNum,
186 unsigned char streamChannelId) {
187 // Remove - from our list of 'TCP streams' - the record of the (sockNum,streamChannelId) pair.
188 // (However "streamChannelId" == 0xFF is a special case, meaning remove all
189 // (sockNum,*) pairs.)
190
191 while (1) {
192 tcpStreamRecord** streamsPtr = &fTCPStreams;
193
194 while (*streamsPtr != NULL) {
195 if ((*streamsPtr)->fStreamSocketNum == sockNum
196 && (streamChannelId == 0xFF || streamChannelId == (*streamsPtr)->fStreamChannelId)) {
197 // Delete the record pointed to by *streamsPtr :
198 unsigned char streamChannelIdToRemove = (*streamsPtr)->fStreamChannelId;
199 tcpStreamRecord* next = (*streamsPtr)->fNext;
200 (*streamsPtr)->fNext = NULL;
201 delete (*streamsPtr);
202 *streamsPtr = next;
203
204 // And 'deregister' this socket,channelId pair:
205 deregisterSocket(envir(), sockNum, streamChannelIdToRemove);
206
207 if (streamChannelId != 0xFF) return; // we're done
208 break; // start again from the beginning of the list, in case the list has changed
209 } else {
210 streamsPtr = &((*streamsPtr)->fNext);
211 }
212 }
213 if (*streamsPtr == NULL) break;
214 }
215 }
216
setServerRequestAlternativeByteHandler(UsageEnvironment & env,int socketNum,ServerRequestAlternativeByteHandler * handler,void * clientData)217 void RTPInterface::setServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum,
218 ServerRequestAlternativeByteHandler* handler, void* clientData) {
219 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(env, socketNum, False);
220
221 if (socketDescriptor != NULL) socketDescriptor->setServerRequestAlternativeByteHandler(handler, clientData);
222 }
223
clearServerRequestAlternativeByteHandler(UsageEnvironment & env,int socketNum)224 void RTPInterface::clearServerRequestAlternativeByteHandler(UsageEnvironment& env, int socketNum) {
225 setServerRequestAlternativeByteHandler(env, socketNum, NULL, NULL);
226 }
227
sendPacket(unsigned char * packet,unsigned packetSize)228 Boolean RTPInterface::sendPacket(unsigned char* packet, unsigned packetSize) {
229 Boolean success = True; // we'll return False instead if any of the sends fail
230
231 // Normal case: Send as a UDP packet:
232 if (!fGS->output(envir(), packet, packetSize)) success = False;
233
234 // Also, send over each of our TCP sockets:
235 tcpStreamRecord* nextStream;
236 for (tcpStreamRecord* stream = fTCPStreams; stream != NULL; stream = nextStream) {
237 nextStream = stream->fNext; // Set this now, in case the following deletes "stream":
238 if (!sendRTPorRTCPPacketOverTCP(packet, packetSize,
239 stream->fStreamSocketNum, stream->fStreamChannelId)) {
240 success = False;
241 }
242 }
243
244 return success;
245 }
246
247 void RTPInterface
startNetworkReading(TaskScheduler::BackgroundHandlerProc * handlerProc)248 ::startNetworkReading(TaskScheduler::BackgroundHandlerProc* handlerProc) {
249 // Normal case: Arrange to read UDP packets:
250 envir().taskScheduler().
251 turnOnBackgroundReadHandling(fGS->socketNum(), handlerProc, fOwner);
252
253 // Also, receive RTP over TCP, on each of our TCP connections:
254 fReadHandlerProc = handlerProc;
255 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL;
256 streams = streams->fNext) {
257 // Get a socket descriptor for "streams->fStreamSocketNum":
258 SocketDescriptor* socketDescriptor = lookupSocketDescriptor(envir(), streams->fStreamSocketNum);
259
260 // Tell it about our subChannel:
261 socketDescriptor->registerRTPInterface(streams->fStreamChannelId, this);
262 }
263 }
264
handleRead(unsigned char * buffer,unsigned bufferMaxSize,unsigned & bytesRead,struct sockaddr_storage & fromAddress,int & tcpSocketNum,unsigned char & tcpStreamChannelId,Boolean & packetReadWasIncomplete)265 Boolean RTPInterface::handleRead(unsigned char* buffer, unsigned bufferMaxSize,
266 unsigned& bytesRead, struct sockaddr_storage& fromAddress,
267 int& tcpSocketNum, unsigned char& tcpStreamChannelId,
268 Boolean& packetReadWasIncomplete) {
269 packetReadWasIncomplete = False; // by default
270 Boolean readSuccess;
271 if (fNextTCPReadStreamSocketNum < 0) {
272 // Normal case: read from the (datagram) 'groupsock':
273 tcpSocketNum = -1;
274 readSuccess = fGS->handleRead(buffer, bufferMaxSize, bytesRead, fromAddress);
275 } else {
276 // Read from the TCP connection:
277 tcpSocketNum = fNextTCPReadStreamSocketNum;
278 tcpStreamChannelId = fNextTCPReadStreamChannelId;
279
280 bytesRead = 0;
281 unsigned totBytesToRead = fNextTCPReadSize;
282 if (totBytesToRead > bufferMaxSize) totBytesToRead = bufferMaxSize;
283 unsigned curBytesToRead = totBytesToRead;
284 int curBytesRead;
285 while ((curBytesRead = readSocket(envir(), fNextTCPReadStreamSocketNum,
286 &buffer[bytesRead], curBytesToRead,
287 fromAddress)) > 0) {
288 bytesRead += curBytesRead;
289 if (bytesRead >= totBytesToRead) break;
290 curBytesToRead -= curBytesRead;
291 }
292 fNextTCPReadSize -= bytesRead;
293 if (fNextTCPReadSize == 0) {
294 // We've read all of the data that we asked for
295 readSuccess = True;
296 } else if (curBytesRead < 0) {
297 // There was an error reading the socket
298 bytesRead = 0;
299 readSuccess = False;
300 } else {
301 // We need to read more bytes, and there was not an error reading the socket
302 packetReadWasIncomplete = True;
303 return True;
304 }
305 fNextTCPReadStreamSocketNum = -1; // default, for next time
306 }
307
308 if (readSuccess && fAuxReadHandlerFunc != NULL) {
309 // Also pass the newly-read packet data to our auxilliary handler:
310 (*fAuxReadHandlerFunc)(fAuxReadHandlerClientData, buffer, bytesRead);
311 }
312 return readSuccess;
313 }
314
stopNetworkReading()315 void RTPInterface::stopNetworkReading() {
316 // Normal case
317 if (fGS != NULL) envir().taskScheduler().turnOffBackgroundReadHandling(fGS->socketNum());
318
319 // Also turn off read handling on each of our TCP connections:
320 for (tcpStreamRecord* streams = fTCPStreams; streams != NULL; streams = streams->fNext) {
321 deregisterSocket(envir(), streams->fStreamSocketNum, streams->fStreamChannelId);
322 }
323 }
324
325
326 ////////// Helper Functions - Implementation /////////
327
sendRTPorRTCPPacketOverTCP(u_int8_t * packet,unsigned packetSize,int socketNum,unsigned char streamChannelId)328 Boolean RTPInterface::sendRTPorRTCPPacketOverTCP(u_int8_t* packet, unsigned packetSize,
329 int socketNum, unsigned char streamChannelId) {
330 #ifdef DEBUG_SEND
331 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: %d bytes over channel %d (socket %d)\n",
332 packetSize, streamChannelId, socketNum); fflush(stderr);
333 #endif
334 // Send a RTP/RTCP packet over TCP, using the encoding defined in RFC 2326, section 10.12:
335 // $<streamChannelId><packetSize><packet>
336 // (If the initial "send()" of '$<streamChannelId><packetSize>' succeeds, then we force
337 // the subsequent "send()" for the <packet> data to succeed, even if we have to do so with
338 // a blocking "send()".)
339 do {
340 u_int8_t framingHeader[4];
341 framingHeader[0] = '$';
342 framingHeader[1] = streamChannelId;
343 framingHeader[2] = (u_int8_t) ((packetSize&0xFF00)>>8);
344 framingHeader[3] = (u_int8_t) (packetSize&0xFF);
345 if (!sendDataOverTCP(socketNum, framingHeader, 4, False)) break;
346
347 if (!sendDataOverTCP(socketNum, packet, packetSize, True)) break;
348 #ifdef DEBUG_SEND
349 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: completed\n"); fflush(stderr);
350 #endif
351
352 return True;
353 } while (0);
354
355 #ifdef DEBUG_SEND
356 fprintf(stderr, "sendRTPorRTCPPacketOverTCP: failed! (errno %d)\n", envir().getErrno()); fflush(stderr);
357 #endif
358 return False;
359 }
360
361 #ifndef RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS
362 #define RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS 500
363 #endif
364
sendDataOverTCP(int socketNum,u_int8_t const * data,unsigned dataSize,Boolean forceSendToSucceed)365 Boolean RTPInterface::sendDataOverTCP(int socketNum, u_int8_t const* data, unsigned dataSize, Boolean forceSendToSucceed) {
366 int sendResult = send(socketNum, (char const*)data, dataSize, 0/*flags*/);
367 if (sendResult < (int)dataSize) {
368 // The TCP send() failed - at least partially.
369
370 unsigned numBytesSentSoFar = sendResult < 0 ? 0 : (unsigned)sendResult;
371 if (numBytesSentSoFar > 0 || (forceSendToSucceed && envir().getErrno() == EAGAIN)) {
372 // The OS's TCP send buffer has filled up (because the stream's bitrate has exceeded
373 // the capacity of the TCP connection!).
374 // Force this data write to succeed, by blocking if necessary until it does:
375 unsigned numBytesRemainingToSend = dataSize - numBytesSentSoFar;
376 #ifdef DEBUG_SEND
377 fprintf(stderr, "sendDataOverTCP: resending %d-byte send (blocking)\n", numBytesRemainingToSend); fflush(stderr);
378 #endif
379 makeSocketBlocking(socketNum, RTPINTERFACE_BLOCKING_WRITE_TIMEOUT_MS);
380 sendResult = send(socketNum, (char const*)(&data[numBytesSentSoFar]), numBytesRemainingToSend, 0/*flags*/);
381 if ((unsigned)sendResult != numBytesRemainingToSend) {
382 // The blocking "send()" failed, or timed out. In either case, we assume that the
383 // TCP connection has failed (or is 'hanging' indefinitely), and we stop using it
384 // (for both RTP and RTP).
385 // (If we kept using the socket here, the RTP or RTCP packet write would be in an
386 // incomplete, inconsistent state.)
387 #ifdef DEBUG_SEND
388 fprintf(stderr, "sendDataOverTCP: blocking send() failed (delivering %d bytes out of %d); closing socket %d\n", sendResult, numBytesRemainingToSend, socketNum); fflush(stderr);
389 #endif
390 removeStreamSocket(socketNum, 0xFF);
391 return False;
392 }
393 makeSocketNonBlocking(socketNum);
394
395 return True;
396 } else if (sendResult < 0 && envir().getErrno() != EAGAIN) {
397 // Because the "send()" call failed, assume that the socket is now unusable, so stop
398 // using it (for both RTP and RTCP):
399 removeStreamSocket(socketNum, 0xFF);
400 }
401
402 return False;
403 }
404
405 return True;
406 }
407
SocketDescriptor(UsageEnvironment & env,int socketNum)408 SocketDescriptor::SocketDescriptor(UsageEnvironment& env, int socketNum)
409 :fEnv(env), fOurSocketNum(socketNum),
410 fSubChannelHashTable(HashTable::create(ONE_WORD_HASH_KEYS)),
411 fServerRequestAlternativeByteHandler(NULL), fServerRequestAlternativeByteHandlerClientData(NULL),
412 fReadErrorOccurred(False), fDeleteMyselfNext(False), fAreInReadHandlerLoop(False), fTCPReadingState(AWAITING_DOLLAR) {
413 }
414
~SocketDescriptor()415 SocketDescriptor::~SocketDescriptor() {
416 fEnv.taskScheduler().turnOffBackgroundReadHandling(fOurSocketNum);
417 removeSocketDescription(fEnv, fOurSocketNum);
418
419 if (fSubChannelHashTable != NULL) {
420 // Remove knowledge of this socket from any "RTPInterface"s that are using it:
421 HashTable::Iterator* iter = HashTable::Iterator::create(*fSubChannelHashTable);
422 RTPInterface* rtpInterface;
423 char const* key;
424
425 while ((rtpInterface = (RTPInterface*)(iter->next(key))) != NULL) {
426 u_int64_t streamChannelIdLong = (u_int64_t)key;
427 unsigned char streamChannelId = (unsigned char)streamChannelIdLong;
428
429 rtpInterface->removeStreamSocket(fOurSocketNum, streamChannelId);
430 }
431 delete iter;
432
433 // Then remove the hash table entries themselves, and then remove the hash table:
434 while (fSubChannelHashTable->RemoveNext() != NULL) {}
435 delete fSubChannelHashTable;
436 }
437
438 // Finally:
439 if (fServerRequestAlternativeByteHandler != NULL) {
440 // Hack: Pass a special character to our alternative byte handler, to tell it that either
441 // - an error occurred when reading the TCP socket, or
442 // - no error occurred, but it needs to take over control of the TCP socket once again.
443 u_int8_t specialChar = fReadErrorOccurred ? 0xFF : 0xFE;
444 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, specialChar);
445 }
446 }
447
registerRTPInterface(unsigned char streamChannelId,RTPInterface * rtpInterface)448 void SocketDescriptor::registerRTPInterface(unsigned char streamChannelId,
449 RTPInterface* rtpInterface) {
450 Boolean isFirstRegistration = fSubChannelHashTable->IsEmpty();
451 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
452 fprintf(stderr, "SocketDescriptor(socket %d)::registerRTPInterface(channel %d): isFirstRegistration %d\n", fOurSocketNum, streamChannelId, isFirstRegistration);
453 #endif
454 fSubChannelHashTable->Add((char const*)(long)streamChannelId,
455 rtpInterface);
456
457 if (isFirstRegistration) {
458 // Arrange to handle reads on this TCP socket:
459 TaskScheduler::BackgroundHandlerProc* handler
460 = (TaskScheduler::BackgroundHandlerProc*)&tcpReadHandler;
461 fEnv.taskScheduler().
462 setBackgroundHandling(fOurSocketNum, SOCKET_READABLE|SOCKET_EXCEPTION, handler, this);
463 }
464 }
465
466 RTPInterface* SocketDescriptor
lookupRTPInterface(unsigned char streamChannelId)467 ::lookupRTPInterface(unsigned char streamChannelId) {
468 char const* lookupArg = (char const*)(long)streamChannelId;
469 return (RTPInterface*)(fSubChannelHashTable->Lookup(lookupArg));
470 }
471
472 void SocketDescriptor
deregisterRTPInterface(unsigned char streamChannelId)473 ::deregisterRTPInterface(unsigned char streamChannelId) {
474 #if defined(DEBUG_SEND)||defined(DEBUG_RECEIVE)
475 fprintf(stderr, "SocketDescriptor(socket %d)::deregisterRTPInterface(channel %d)\n", fOurSocketNum, streamChannelId);
476 #endif
477 fSubChannelHashTable->Remove((char const*)(long)streamChannelId);
478
479 if (fSubChannelHashTable->IsEmpty()) {
480 // No more interfaces are using us, so it's curtains for us now:
481 if (fAreInReadHandlerLoop) {
482 fDeleteMyselfNext = True; // we can't delete ourself yet, but we'll do so from "tcpReadHandler()" below
483 } else {
484 delete this;
485 }
486 }
487 }
488
tcpReadHandler(SocketDescriptor * socketDescriptor,int mask)489 void SocketDescriptor::tcpReadHandler(SocketDescriptor* socketDescriptor, int mask) {
490 // Call the read handler until it returns false, with a limit to avoid starving other sockets
491 unsigned count = 2000;
492 socketDescriptor->fAreInReadHandlerLoop = True;
493 while (!socketDescriptor->fDeleteMyselfNext && socketDescriptor->tcpReadHandler1(mask) && --count > 0) {}
494 socketDescriptor->fAreInReadHandlerLoop = False;
495 if (socketDescriptor->fDeleteMyselfNext) delete socketDescriptor;
496 }
497
tcpReadHandler1(int mask)498 Boolean SocketDescriptor::tcpReadHandler1(int mask) {
499 // We expect the following data over the TCP channel:
500 // optional RTSP command or response bytes (before the first '$' character)
501 // a '$' character
502 // a 1-byte channel id
503 // a 2-byte packet size (in network byte order)
504 // the packet data.
505 // However, because the socket is being read asynchronously, this data might arrive in pieces.
506
507 u_int8_t c;
508 struct sockaddr_storage fromAddress;
509 if (fTCPReadingState != AWAITING_PACKET_DATA) {
510 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
511 if (result == 0) { // There was no more data to read
512 return False;
513 } else if (result != 1) { // error reading TCP socket, so we will no longer handle it
514 #ifdef DEBUG_RECEIVE
515 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
516 #endif
517 fReadErrorOccurred = True;
518 fDeleteMyselfNext = True;
519 return False;
520 }
521 }
522
523 Boolean callAgain = True;
524 switch (fTCPReadingState) {
525 case AWAITING_DOLLAR: {
526 if (c == '$') {
527 #ifdef DEBUG_RECEIVE
528 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw '$'\n", fOurSocketNum);
529 #endif
530 fTCPReadingState = AWAITING_STREAM_CHANNEL_ID;
531 } else {
532 // This character is part of a RTSP request or command, which is handled separately:
533 if (fServerRequestAlternativeByteHandler != NULL && c != 0xFF && c != 0xFE) {
534 // Hack: 0xFF and 0xFE are used as special signaling characters, so don't send them
535 (*fServerRequestAlternativeByteHandler)(fServerRequestAlternativeByteHandlerClientData, c);
536 }
537 }
538 break;
539 }
540 case AWAITING_STREAM_CHANNEL_ID: {
541 // The byte that we read is the stream channel id.
542 if (lookupRTPInterface(c) != NULL) { // sanity check
543 fStreamChannelId = c;
544 fTCPReadingState = AWAITING_SIZE1;
545 } else {
546 // This wasn't a stream channel id that we expected. We're (somehow) in a strange state. Try to recover:
547 #ifdef DEBUG_RECEIVE
548 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): Saw nonexistent stream channel id: 0x%02x\n", fOurSocketNum, c);
549 #endif
550 fTCPReadingState = AWAITING_DOLLAR;
551 }
552 break;
553 }
554 case AWAITING_SIZE1: {
555 // The byte that we read is the first (high) byte of the 16-bit RTP or RTCP packet 'size'.
556 fSizeByte1 = c;
557 fTCPReadingState = AWAITING_SIZE2;
558 break;
559 }
560 case AWAITING_SIZE2: {
561 // The byte that we read is the second (low) byte of the 16-bit RTP or RTCP packet 'size'.
562 unsigned short size = (fSizeByte1<<8)|c;
563
564 // Record the information about the packet data that will be read next:
565 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
566 if (rtpInterface != NULL) {
567 rtpInterface->fNextTCPReadSize = size;
568 rtpInterface->fNextTCPReadStreamSocketNum = fOurSocketNum;
569 rtpInterface->fNextTCPReadStreamChannelId = fStreamChannelId;
570 }
571 fTCPReadingState = AWAITING_PACKET_DATA;
572 break;
573 }
574 case AWAITING_PACKET_DATA: {
575 callAgain = False;
576 fTCPReadingState = AWAITING_DOLLAR; // the next state, unless we end up having to read more data in the current state
577 // Call the appropriate read handler to get the packet data from the TCP stream:
578 RTPInterface* rtpInterface = lookupRTPInterface(fStreamChannelId);
579 if (rtpInterface != NULL) {
580 if (rtpInterface->fNextTCPReadSize == 0) {
581 // We've already read all the data for this packet.
582 break;
583 }
584 if (rtpInterface->fReadHandlerProc != NULL) {
585 #ifdef DEBUG_RECEIVE
586 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): reading %d bytes on channel %d\n", fOurSocketNum, rtpInterface->fNextTCPReadSize, rtpInterface->fNextTCPReadStreamChannelId);
587 #endif
588 fTCPReadingState = AWAITING_PACKET_DATA;
589 rtpInterface->fReadHandlerProc(rtpInterface->fOwner, mask);
590 } else {
591 #ifdef DEBUG_RECEIVE
592 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No handler proc for \"rtpInterface\" for channel %d; need to skip %d remaining bytes\n", fOurSocketNum, fStreamChannelId, rtpInterface->fNextTCPReadSize);
593 #endif
594 int result = readSocket(fEnv, fOurSocketNum, &c, 1, fromAddress);
595 if (result < 0) { // error reading TCP socket, so we will no longer handle it
596 #ifdef DEBUG_RECEIVE
597 fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): readSocket(1 byte) returned %d (error)\n", fOurSocketNum, result);
598 #endif
599 fReadErrorOccurred = True;
600 fDeleteMyselfNext = True;
601 return False;
602 } else {
603 fTCPReadingState = AWAITING_PACKET_DATA;
604 if (result == 1) {
605 --rtpInterface->fNextTCPReadSize;
606 callAgain = True;
607 }
608 }
609 }
610 }
611 #ifdef DEBUG_RECEIVE
612 else fprintf(stderr, "SocketDescriptor(socket %d)::tcpReadHandler(): No \"rtpInterface\" for channel %d\n", fOurSocketNum, fStreamChannelId);
613 #endif
614 }
615 }
616
617 return callAgain;
618 }
619
620
621 ////////// tcpStreamRecord implementation //////////
622
623 tcpStreamRecord
tcpStreamRecord(int streamSocketNum,unsigned char streamChannelId,tcpStreamRecord * next)624 ::tcpStreamRecord(int streamSocketNum, unsigned char streamChannelId,
625 tcpStreamRecord* next)
626 : fNext(next),
627 fStreamSocketNum(streamSocketNum), fStreamChannelId(streamChannelId) {
628 }
629
~tcpStreamRecord()630 tcpStreamRecord::~tcpStreamRecord() {
631 delete fNext;
632 }
633