1 /**********
2 This library is free software; you can redistribute it and/or modify it under
3 the terms of the GNU Lesser General Public License as published by the
4 Free Software Foundation; either version 3 of the License, or (at your
5 option) any later version. (See <http://www.gnu.org/copyleft/lesser.html>.)
6 
7 This library is distributed in the hope that it will be useful, but WITHOUT
8 ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
9 FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public License for
10 more details.
11 
12 You should have received a copy of the GNU Lesser General Public License
13 along with this library; if not, write to the Free Software Foundation, Inc.,
14 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA
15 **********/
16 // "liveMedia"
17 // Copyright (c) 1996-2020 Live Networks, Inc.  All rights reserved.
18 // An class that can be used to create (possibly multiple) 'replicas' of an incoming stream.
19 // Implementation.
20 
21 #include "StreamReplicator.hh"
22 
23 ////////// Definition of "StreamReplica": The class that implements each stream replica //////////
24 
25 class StreamReplica: public FramedSource {
26 protected:
27   friend class StreamReplicator;
28   StreamReplica(StreamReplicator& ourReplicator); // called only by "StreamReplicator::createStreamReplica()"
29   virtual ~StreamReplica();
30 
31 private: // redefined virtual functions:
32   virtual void doGetNextFrame();
33   virtual void doStopGettingFrames();
34 
35 private:
36   static void copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica);
37 
38 private:
39   StreamReplicator& fOurReplicator;
40   int fFrameIndex; // 0 or 1, depending upon which frame we're currently requesting; could also be -1 if we've stopped playing
41 
42   // Replicas that are currently awaiting data are kept in a (singly-linked) list:
43   StreamReplica* fNext;
44 };
45 
46 
47 ////////// StreamReplicator implementation //////////
48 
createNew(UsageEnvironment & env,FramedSource * inputSource,Boolean deleteWhenLastReplicaDies)49 StreamReplicator* StreamReplicator::createNew(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies) {
50   return new StreamReplicator(env, inputSource, deleteWhenLastReplicaDies);
51 }
52 
StreamReplicator(UsageEnvironment & env,FramedSource * inputSource,Boolean deleteWhenLastReplicaDies)53 StreamReplicator::StreamReplicator(UsageEnvironment& env, FramedSource* inputSource, Boolean deleteWhenLastReplicaDies)
54   : Medium(env),
55     fInputSource(inputSource), fDeleteWhenLastReplicaDies(deleteWhenLastReplicaDies), fInputSourceHasClosed(False),
56     fNumReplicas(0), fNumActiveReplicas(0), fNumDeliveriesMadeSoFar(0),
57     fFrameIndex(0), fPrimaryReplica(NULL), fReplicasAwaitingCurrentFrame(NULL), fReplicasAwaitingNextFrame(NULL) {
58 }
59 
~StreamReplicator()60 StreamReplicator::~StreamReplicator() {
61   Medium::close(fInputSource);
62 }
63 
createStreamReplica()64 FramedSource* StreamReplicator::createStreamReplica() {
65   ++fNumReplicas;
66   return new StreamReplica(*this);
67 }
68 
getNextFrame(StreamReplica * replica)69 void StreamReplicator::getNextFrame(StreamReplica* replica) {
70   if (fInputSourceHasClosed) { // handle closure instead
71     replica->handleClosure();
72     return;
73   }
74 
75   if (replica->fFrameIndex == -1) {
76     // This replica had stopped playing (or had just been created), but is now actively reading.  Note this:
77     replica->fFrameIndex = fFrameIndex;
78     ++fNumActiveReplicas;
79   }
80 
81   if (fPrimaryReplica == NULL) {
82     // This is the first replica to request the next unread frame.  Make it the 'primary' replica - meaning that we read the frame
83     // into its buffer, and then copy from this into the other replicas' buffers.
84     fPrimaryReplica = replica;
85 
86     // Arrange to read the next frame into this replica's buffer:
87     if (fInputSource != NULL) fInputSource->getNextFrame(fPrimaryReplica->fTo, fPrimaryReplica->fMaxSize,
88 							 afterGettingFrame, this, onSourceClosure, this);
89   } else if (replica->fFrameIndex != fFrameIndex) {
90     // This replica is already asking for the next frame (because it has already received the current frame).  Enqueue it:
91     replica->fNext = fReplicasAwaitingNextFrame;
92     fReplicasAwaitingNextFrame = replica;
93   } else {
94     // This replica is asking for the current frame.  Enqueue it:
95     replica->fNext = fReplicasAwaitingCurrentFrame;
96     fReplicasAwaitingCurrentFrame = replica;
97 
98     if (fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) {
99       // The current frame has already arrived, so deliver it to this replica now:
100       deliverReceivedFrame();
101     }
102   }
103 }
104 
deactivateStreamReplica(StreamReplica * replicaBeingDeactivated)105 void StreamReplicator::deactivateStreamReplica(StreamReplica* replicaBeingDeactivated) {
106   if (replicaBeingDeactivated->fFrameIndex == -1) return; // this replica has already been deactivated (or was never activated at all)
107 
108   // Assert: fNumActiveReplicas > 0
109   if (fNumActiveReplicas == 0) fprintf(stderr, "StreamReplicator::deactivateStreamReplica() Internal Error!\n"); // should not happen
110   --fNumActiveReplicas;
111 
112   // Forget about any frame delivery that might have just been made to this replica:
113   if (replicaBeingDeactivated->fFrameIndex != fFrameIndex && fNumDeliveriesMadeSoFar > 0) --fNumDeliveriesMadeSoFar;
114 
115   replicaBeingDeactivated->fFrameIndex = -1;
116 
117   // Check whether the replica being deactivated is the 'primary' replica, or is enqueued awaiting a frame:
118   if (replicaBeingDeactivated == fPrimaryReplica) {
119     // We need to replace the 'primary replica', if we can:
120     if (fReplicasAwaitingCurrentFrame == NULL) {
121       // There's currently no replacement 'primary replica'
122       fPrimaryReplica = NULL;
123     } else {
124       // There's another replica that we can use as a replacement 'primary replica':
125       fPrimaryReplica = fReplicasAwaitingCurrentFrame;
126       fReplicasAwaitingCurrentFrame = fReplicasAwaitingCurrentFrame->fNext;
127       fPrimaryReplica->fNext = NULL;
128     }
129 
130     // Check whether the read into the old primary replica's buffer is still pending, or has completed:
131     if (fInputSource != NULL) {
132       if (fInputSource->isCurrentlyAwaitingData()) {
133 	// We have a pending read into the old primary replica's buffer.
134 	// We need to stop it, and retry the read with a new primary (if available)
135 	fInputSource->stopGettingFrames();
136 
137 	if (fPrimaryReplica != NULL) {
138 	  fInputSource->getNextFrame(fPrimaryReplica->fTo, fPrimaryReplica->fMaxSize,
139 				     afterGettingFrame, this, onSourceClosure, this);
140 	}
141       } else {
142 	// The read into the old primary replica's buffer has already completed.  Copy the data to the new primary replica (if any):
143 	if (fPrimaryReplica != NULL) {
144 	  StreamReplica::copyReceivedFrame(fPrimaryReplica, replicaBeingDeactivated);
145 	} else {
146 	  // We don't have a new primary replica, so we can't copy the received frame to any new replica that might ask for it.
147 	  // Fortunately this should be a very rare occurrence.
148 	}
149       }
150     }
151   } else {
152     // The replica that's being removed was not our 'primary replica', but make sure it's not on either of our queues:
153     if (fReplicasAwaitingCurrentFrame != NULL) {
154       if (replicaBeingDeactivated == fReplicasAwaitingCurrentFrame) {
155 	fReplicasAwaitingCurrentFrame = replicaBeingDeactivated->fNext;
156 	replicaBeingDeactivated->fNext = NULL;
157       }
158       else {
159 	for (StreamReplica* r1 = fReplicasAwaitingCurrentFrame; r1->fNext != NULL; r1 = r1->fNext) {
160 	  if (r1->fNext == replicaBeingDeactivated) {
161 	    r1->fNext = replicaBeingDeactivated->fNext;
162 	    replicaBeingDeactivated->fNext = NULL;
163 	    break;
164 	  }
165 	}
166       }
167     }
168     if (fReplicasAwaitingNextFrame != NULL) {
169       if (replicaBeingDeactivated == fReplicasAwaitingNextFrame) {
170 	fReplicasAwaitingNextFrame = replicaBeingDeactivated->fNext;
171 	replicaBeingDeactivated->fNext = NULL;
172       }
173       else {
174 	for (StreamReplica* r2 = fReplicasAwaitingNextFrame; r2->fNext != NULL; r2 = r2->fNext) {
175 	  if (r2->fNext == replicaBeingDeactivated) {
176 	    r2->fNext = replicaBeingDeactivated->fNext;
177 	    replicaBeingDeactivated->fNext = NULL;
178 	    break;
179 	  }
180 	}
181       }
182     }
183 
184     // Check for the possibility that - now that a replica has been deactivated - all other
185     // replicas have received the current frame, and so now we need to complete delivery to
186     // the primary replica:
187     if (fPrimaryReplica != NULL && fInputSource != NULL && !fInputSource->isCurrentlyAwaitingData()) deliverReceivedFrame();
188   }
189 
190   if (fNumActiveReplicas == 0 && fInputSource != NULL) fInputSource->stopGettingFrames(); // tell our source to stop too
191 }
192 
removeStreamReplica(StreamReplica * replicaBeingRemoved)193 void StreamReplicator::removeStreamReplica(StreamReplica* replicaBeingRemoved) {
194   // First, handle the replica that's being removed the same way that we would if it were merely being deactivated:
195   deactivateStreamReplica(replicaBeingRemoved);
196 
197   // Assert: fNumReplicas > 0
198   if (fNumReplicas == 0) fprintf(stderr, "StreamReplicator::removeStreamReplica() Internal Error!\n"); // should not happen
199   --fNumReplicas;
200 
201   // If this was the last replica, then delete ourselves (if we were set up to do so):
202   if (fNumReplicas == 0 && fDeleteWhenLastReplicaDies) {
203     Medium::close(this);
204     return;
205   }
206 }
207 
afterGettingFrame(void * clientData,unsigned frameSize,unsigned numTruncatedBytes,struct timeval presentationTime,unsigned durationInMicroseconds)208 void StreamReplicator::afterGettingFrame(void* clientData, unsigned frameSize, unsigned numTruncatedBytes,
209 					 struct timeval presentationTime, unsigned durationInMicroseconds) {
210   ((StreamReplicator*)clientData)->afterGettingFrame(frameSize, numTruncatedBytes, presentationTime, durationInMicroseconds);
211 }
212 
afterGettingFrame(unsigned frameSize,unsigned numTruncatedBytes,struct timeval presentationTime,unsigned durationInMicroseconds)213 void StreamReplicator::afterGettingFrame(unsigned frameSize, unsigned numTruncatedBytes,
214 					 struct timeval presentationTime, unsigned durationInMicroseconds) {
215   // The frame was read into our primary replica's buffer.  Update the primary replica's state, but don't complete delivery to it
216   // just yet.  We do that later, after we're sure that we've delivered it to all other replicas.
217   fPrimaryReplica->fFrameSize = frameSize;
218   fPrimaryReplica->fNumTruncatedBytes = numTruncatedBytes;
219   fPrimaryReplica->fPresentationTime = presentationTime;
220   fPrimaryReplica->fDurationInMicroseconds = durationInMicroseconds;
221 
222   deliverReceivedFrame();
223 }
224 
onSourceClosure(void * clientData)225 void StreamReplicator::onSourceClosure(void* clientData) {
226   ((StreamReplicator*)clientData)->onSourceClosure();
227 }
228 
onSourceClosure()229 void StreamReplicator::onSourceClosure() {
230   fInputSourceHasClosed = True;
231 
232   // Signal the closure to each replica that is currently awaiting a frame:
233   StreamReplica* replica;
234   while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
235     fReplicasAwaitingCurrentFrame = replica->fNext;
236     replica->fNext = NULL;
237     replica->handleClosure();
238   }
239   while ((replica = fReplicasAwaitingNextFrame) != NULL) {
240     fReplicasAwaitingNextFrame = replica->fNext;
241     replica->fNext = NULL;
242     replica->handleClosure();
243   }
244   if ((replica = fPrimaryReplica) != NULL) {
245     fPrimaryReplica = NULL;
246     replica->handleClosure();
247   }
248 }
249 
deliverReceivedFrame()250 void StreamReplicator::deliverReceivedFrame() {
251   // The 'primary replica' has received its copy of the current frame.
252   // Copy it (and complete delivery) to any other replica that has requested this frame.
253   // Then, if no more requests for this frame are expected, complete delivery to the 'primary replica' itself.
254   StreamReplica* replica;
255   while ((replica = fReplicasAwaitingCurrentFrame) != NULL) {
256     fReplicasAwaitingCurrentFrame = replica->fNext;
257     replica->fNext = NULL;
258 
259     // Assert: fPrimaryReplica != NULL
260     if (fPrimaryReplica == NULL) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 1!\n"); // shouldn't happen
261     StreamReplica::copyReceivedFrame(replica, fPrimaryReplica);
262     replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
263     ++fNumDeliveriesMadeSoFar;
264 
265     // Assert: fNumDeliveriesMadeSoFar < fNumActiveReplicas; // because we still have the 'primary replica' to deliver to
266     if (!(fNumDeliveriesMadeSoFar < fNumActiveReplicas)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 2(%d,%d)!\n", fNumDeliveriesMadeSoFar, fNumActiveReplicas); // should not happen
267 
268     // Complete delivery to this replica:
269     FramedSource::afterGetting(replica);
270   }
271 
272   if (fNumDeliveriesMadeSoFar == fNumActiveReplicas - 1 && fPrimaryReplica != NULL) {
273     // No more requests for this frame are expected, so complete delivery to the 'primary replica':
274     replica = fPrimaryReplica;
275     fPrimaryReplica = NULL;
276     replica->fFrameIndex = 1 - replica->fFrameIndex; // toggle it (0<->1), because this replica no longer awaits the current frame
277     fFrameIndex = 1 - fFrameIndex; // toggle it (0<->1) for the next frame
278     fNumDeliveriesMadeSoFar = 0; // reset for the next frame
279 
280     if (fReplicasAwaitingNextFrame != NULL) {
281       // One of the other replicas has already requested the next frame, so make it the next 'primary replica':
282       fPrimaryReplica = fReplicasAwaitingNextFrame;
283       fReplicasAwaitingNextFrame = fReplicasAwaitingNextFrame->fNext;
284       fPrimaryReplica->fNext = NULL;
285 
286       // Arrange to read the next frame into this replica's buffer:
287       if (fInputSource != NULL) fInputSource->getNextFrame(fPrimaryReplica->fTo, fPrimaryReplica->fMaxSize,
288 							   afterGettingFrame, this, onSourceClosure, this);
289     }
290 
291     // Move any other replicas that had already requested the next frame to the 'requesting current frame' list:
292     // Assert: fReplicasAwaitingCurrentFrame == NULL;
293     if (!(fReplicasAwaitingCurrentFrame == NULL)) fprintf(stderr, "StreamReplicator::deliverReceivedFrame() Internal Error 3!\n"); // should not happen
294     fReplicasAwaitingCurrentFrame = fReplicasAwaitingNextFrame;
295     fReplicasAwaitingNextFrame = NULL;
296 
297     // Complete delivery to the 'primary' replica (thereby completing all deliveries for this frame):
298     FramedSource::afterGetting(replica);
299   }
300 }
301 
302 
303 ////////// StreamReplica implementation //////////
304 
StreamReplica(StreamReplicator & ourReplicator)305 StreamReplica::StreamReplica(StreamReplicator& ourReplicator)
306   : FramedSource(ourReplicator.envir()),
307     fOurReplicator(ourReplicator),
308     fFrameIndex(-1/*we haven't started playing yet*/), fNext(NULL) {
309 }
310 
~StreamReplica()311 StreamReplica::~StreamReplica() {
312   fOurReplicator.removeStreamReplica(this);
313 }
314 
doGetNextFrame()315 void StreamReplica::doGetNextFrame() {
316   fOurReplicator.getNextFrame(this);
317 }
318 
doStopGettingFrames()319 void StreamReplica::doStopGettingFrames() {
320   fOurReplicator.deactivateStreamReplica(this);
321 }
322 
copyReceivedFrame(StreamReplica * toReplica,StreamReplica * fromReplica)323 void StreamReplica::copyReceivedFrame(StreamReplica* toReplica, StreamReplica* fromReplica) {
324   // First, figure out how much data to copy.  ("toReplica" might have a smaller buffer than "fromReplica".)
325   unsigned numNewBytesToTruncate
326     = toReplica->fMaxSize < fromReplica->fFrameSize ? fromReplica->fFrameSize - toReplica->fMaxSize : 0;
327   toReplica->fFrameSize = fromReplica->fFrameSize - numNewBytesToTruncate;
328   toReplica->fNumTruncatedBytes = fromReplica->fNumTruncatedBytes + numNewBytesToTruncate;
329 
330   memmove(toReplica->fTo, fromReplica->fTo, toReplica->fFrameSize);
331   toReplica->fPresentationTime = fromReplica->fPresentationTime;
332   toReplica->fDurationInMicroseconds = fromReplica->fDurationInMicroseconds;
333 }
334