1 /* -*- Mode: C++; tab-width: 2; indent-tabs-mode: nil; c-basic-offset: 2 -*- */
2 /* vim:set ts=2 sw=2 sts=2 et cindent: */
3 /* This Source Code Form is subject to the terms of the Mozilla Public
4  * License, v. 2.0. If a copy of the MPL was not distributed with this
5  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6 
7 #include "MediaCache.h"
8 
9 #include "ChannelMediaResource.h"
10 #include "FileBlockCache.h"
11 #include "MediaBlockCacheBase.h"
12 #include "MediaResource.h"
13 #include "MemoryBlockCache.h"
14 #include "mozilla/Attributes.h"
15 #include "mozilla/ClearOnShutdown.h"
16 #include "mozilla/ErrorNames.h"
17 #include "mozilla/Logging.h"
18 #include "mozilla/Monitor.h"
19 #include "mozilla/Preferences.h"
20 #include "mozilla/Services.h"
21 #include "mozilla/StaticPtr.h"
22 #include "mozilla/StaticPrefs_browser.h"
23 #include "mozilla/StaticPrefs_media.h"
24 #include "mozilla/Telemetry.h"
25 #include "nsContentUtils.h"
26 #include "nsINetworkLinkService.h"
27 #include "nsIObserverService.h"
28 #include "nsPrintfCString.h"
29 #include "nsProxyRelease.h"
30 #include "nsTHashSet.h"
31 #include "nsThreadUtils.h"
32 #include "prio.h"
33 #include "VideoUtils.h"
34 #include <algorithm>
35 
36 namespace mozilla {
37 
38 #undef LOG
39 #undef LOGI
40 #undef LOGE
41 LazyLogModule gMediaCacheLog("MediaCache");
42 #define LOG(...) MOZ_LOG(gMediaCacheLog, LogLevel::Debug, (__VA_ARGS__))
43 #define LOGI(...) MOZ_LOG(gMediaCacheLog, LogLevel::Info, (__VA_ARGS__))
44 #define LOGE(...)                                                              \
45   NS_DebugBreak(NS_DEBUG_WARNING, nsPrintfCString(__VA_ARGS__).get(), nullptr, \
46                 __FILE__, __LINE__)
47 
48 // For HTTP seeking, if number of bytes needing to be
49 // seeked forward is less than this value then a read is
50 // done rather than a byte range request.
51 //
52 // If we assume a 100Mbit connection, and assume reissuing an HTTP seek causes
53 // a delay of 200ms, then in that 200ms we could have simply read ahead 2MB. So
54 // setting SEEK_VS_READ_THRESHOLD to 1MB sounds reasonable.
55 static const int64_t SEEK_VS_READ_THRESHOLD = 1 * 1024 * 1024;
56 
57 // Readahead blocks for non-seekable streams will be limited to this
58 // fraction of the cache space. We don't normally evict such blocks
59 // because replacing them requires a seek, but we need to make sure
60 // they don't monopolize the cache.
61 static const double NONSEEKABLE_READAHEAD_MAX = 0.5;
62 
63 // Data N seconds before the current playback position is given the same
64 // priority as data REPLAY_PENALTY_FACTOR*N seconds ahead of the current
65 // playback position. REPLAY_PENALTY_FACTOR is greater than 1 to reflect that
66 // data in the past is less likely to be played again than data in the future.
67 // We want to give data just behind the current playback position reasonably
68 // high priority in case codecs need to retrieve that data (e.g. because
69 // tracks haven't been muxed well or are being decoded at uneven rates).
70 // 1/REPLAY_PENALTY_FACTOR as much data will be kept behind the
71 // current playback position as will be kept ahead of the current playback
72 // position.
73 static const uint32_t REPLAY_PENALTY_FACTOR = 3;
74 
75 // When looking for a reusable block, scan forward this many blocks
76 // from the desired "best" block location to look for free blocks,
77 // before we resort to scanning the whole cache. The idea is to try to
78 // store runs of stream blocks close-to-consecutively in the cache if we
79 // can.
80 static const uint32_t FREE_BLOCK_SCAN_LIMIT = 16;
81 
82 #ifdef DEBUG
83 // Turn this on to do very expensive cache state validation
84 // #define DEBUG_VERIFY_CACHE
85 #endif
86 
87 class MediaCacheFlusher final : public nsIObserver,
88                                 public nsSupportsWeakReference {
89  public:
90   NS_DECL_ISUPPORTS
91   NS_DECL_NSIOBSERVER
92 
93   static void RegisterMediaCache(MediaCache* aMediaCache);
94   static void UnregisterMediaCache(MediaCache* aMediaCache);
95 
96  private:
97   MediaCacheFlusher() = default;
98   ~MediaCacheFlusher() = default;
99 
100   // Singleton instance created when a first MediaCache is registered, and
101   // released when the last MediaCache is unregistered.
102   // The observer service will keep a weak reference to it, for notifications.
103   static StaticRefPtr<MediaCacheFlusher> gMediaCacheFlusher;
104 
105   nsTArray<MediaCache*> mMediaCaches;
106 };
107 
108 /* static */
109 StaticRefPtr<MediaCacheFlusher> MediaCacheFlusher::gMediaCacheFlusher;
110 
NS_IMPL_ISUPPORTS(MediaCacheFlusher,nsIObserver,nsISupportsWeakReference)111 NS_IMPL_ISUPPORTS(MediaCacheFlusher, nsIObserver, nsISupportsWeakReference)
112 
113 /* static */
114 void MediaCacheFlusher::RegisterMediaCache(MediaCache* aMediaCache) {
115   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
116 
117   if (!gMediaCacheFlusher) {
118     gMediaCacheFlusher = new MediaCacheFlusher();
119     nsCOMPtr<nsIObserverService> observerService =
120         mozilla::services::GetObserverService();
121     if (observerService) {
122       observerService->AddObserver(gMediaCacheFlusher, "last-pb-context-exited",
123                                    true);
124       observerService->AddObserver(gMediaCacheFlusher,
125                                    "cacheservice:empty-cache", true);
126       observerService->AddObserver(
127           gMediaCacheFlusher, "contentchild:network-link-type-changed", true);
128       observerService->AddObserver(gMediaCacheFlusher,
129                                    NS_NETWORK_LINK_TYPE_TOPIC, true);
130     }
131   }
132 
133   gMediaCacheFlusher->mMediaCaches.AppendElement(aMediaCache);
134 }
135 
136 /* static */
UnregisterMediaCache(MediaCache * aMediaCache)137 void MediaCacheFlusher::UnregisterMediaCache(MediaCache* aMediaCache) {
138   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
139 
140   gMediaCacheFlusher->mMediaCaches.RemoveElement(aMediaCache);
141 
142   if (gMediaCacheFlusher->mMediaCaches.Length() == 0) {
143     gMediaCacheFlusher = nullptr;
144   }
145 }
146 
147 class MediaCache {
148   using AutoLock = MonitorAutoLock;
149 
150  public:
151   NS_INLINE_DECL_THREADSAFE_REFCOUNTING(MediaCache)
152 
153   friend class MediaCacheStream::BlockList;
154   typedef MediaCacheStream::BlockList BlockList;
155   static const int64_t BLOCK_SIZE = MediaCacheStream::BLOCK_SIZE;
156 
157   // Get an instance of a MediaCache (or nullptr if initialization failed).
158   // aContentLength is the content length if known already, otherwise -1.
159   // If the length is known and considered small enough, a discrete MediaCache
160   // with memory backing will be given. Otherwise the one MediaCache with
161   // file backing will be provided.
162   // If aIsPrivateBrowsing is true, only initialization of a memory backed
163   // MediaCache will be attempted, returning nullptr if that fails.
164   static RefPtr<MediaCache> GetMediaCache(int64_t aContentLength,
165                                           bool aIsPrivateBrowsing);
166 
OwnerThread() const167   nsISerialEventTarget* OwnerThread() const { return sThread; }
168 
169   // Brutally flush the cache contents. Main thread only.
170   void Flush();
171 
172   // Close all streams associated with private browsing windows. This will
173   // also remove the blocks from the cache since we don't want to leave any
174   // traces when PB is done.
175   void CloseStreamsForPrivateBrowsing();
176 
177   // Cache-file access methods. These are the lowest-level cache methods.
178   // mMonitor must be held; these can be called on any thread.
179   // This can return partial reads.
180   // Note mMonitor will be dropped while doing IO. The caller need
181   // to handle changes happening when the monitor is not held.
182   nsresult ReadCacheFile(AutoLock&, int64_t aOffset, void* aData,
183                          int32_t aLength, int32_t* aBytes);
184 
185   // The generated IDs are always positive.
AllocateResourceID(AutoLock &)186   int64_t AllocateResourceID(AutoLock&) { return ++mNextResourceID; }
187 
188   // mMonitor must be held, called on main thread.
189   // These methods are used by the stream to set up and tear down streams,
190   // and to handle reads and writes.
191   // Add aStream to the list of streams.
192   void OpenStream(AutoLock&, MediaCacheStream* aStream, bool aIsClone = false);
193   // Remove aStream from the list of streams.
194   void ReleaseStream(AutoLock&, MediaCacheStream* aStream);
195   // Free all blocks belonging to aStream.
196   void ReleaseStreamBlocks(AutoLock&, MediaCacheStream* aStream);
197   // Find a cache entry for this data, and write the data into it
198   void AllocateAndWriteBlock(
199       AutoLock&, MediaCacheStream* aStream, int32_t aStreamBlockIndex,
200       Span<const uint8_t> aData1,
201       Span<const uint8_t> aData2 = Span<const uint8_t>());
202 
203   // mMonitor must be held; can be called on any thread
204   // Notify the cache that a seek has been requested. Some blocks may
205   // need to change their class between PLAYED_BLOCK and READAHEAD_BLOCK.
206   // This does not trigger channel seeks directly, the next Update()
207   // will do that if necessary. The caller will call QueueUpdate().
208   void NoteSeek(AutoLock&, MediaCacheStream* aStream, int64_t aOldOffset);
209   // Notify the cache that a block has been read from. This is used
210   // to update last-use times. The block may not actually have a
211   // cache entry yet since Read can read data from a stream's
212   // in-memory mPartialBlockBuffer while the block is only partly full,
213   // and thus hasn't yet been committed to the cache. The caller will
214   // call QueueUpdate().
215   void NoteBlockUsage(AutoLock&, MediaCacheStream* aStream, int32_t aBlockIndex,
216                       int64_t aStreamOffset, MediaCacheStream::ReadMode aMode,
217                       TimeStamp aNow);
218   // Mark aStream as having the block, adding it as an owner.
219   void AddBlockOwnerAsReadahead(AutoLock&, int32_t aBlockIndex,
220                                 MediaCacheStream* aStream,
221                                 int32_t aStreamBlockIndex);
222 
223   // This queues a call to Update() on the media cache thread.
224   void QueueUpdate(AutoLock&);
225 
226   // Notify all streams for the resource ID that the suspended status changed
227   // at the end of MediaCache::Update.
228   void QueueSuspendedStatusUpdate(AutoLock&, int64_t aResourceID);
229 
230   // Updates the cache state asynchronously on the media cache thread:
231   // -- try to trim the cache back to its desired size, if necessary
232   // -- suspend channels that are going to read data that's lower priority
233   // than anything currently cached
234   // -- resume channels that are going to read data that's higher priority
235   // than something currently cached
236   // -- seek channels that need to seek to a new location
237   void Update();
238 
239 #ifdef DEBUG_VERIFY_CACHE
240   // Verify invariants, especially block list invariants
241   void Verify(AutoLock&);
242 #else
Verify(AutoLock &)243   void Verify(AutoLock&) {}
244 #endif
245 
Monitor()246   mozilla::Monitor& Monitor() {
247     // This method should only be called outside the main thread.
248     // The MOZ_DIAGNOSTIC_ASSERT(!NS_IsMainThread()) assertion should be
249     // re-added as part of bug 1464045
250     return mMonitor;
251   }
252 
253   // Polls whether we're on a cellular network connection, and posts a task
254   // to the MediaCache thread to set the value of MediaCache::sOnCellular.
255   // Call on main thread only.
256   static void UpdateOnCellular();
257 
258   /**
259    * An iterator that makes it easy to iterate through all streams that
260    * have a given resource ID and are not closed.
261    * Must be used while holding the media cache lock.
262    */
263   class ResourceStreamIterator {
264    public:
ResourceStreamIterator(MediaCache * aMediaCache,int64_t aResourceID)265     ResourceStreamIterator(MediaCache* aMediaCache, int64_t aResourceID)
266         : mMediaCache(aMediaCache), mResourceID(aResourceID), mNext(0) {
267       aMediaCache->mMonitor.AssertCurrentThreadOwns();
268     }
Next(AutoLock & aLock)269     MediaCacheStream* Next(AutoLock& aLock) {
270       while (mNext < mMediaCache->mStreams.Length()) {
271         MediaCacheStream* stream = mMediaCache->mStreams[mNext];
272         ++mNext;
273         if (stream->GetResourceID() == mResourceID && !stream->IsClosed(aLock))
274           return stream;
275       }
276       return nullptr;
277     }
278 
279    private:
280     MediaCache* mMediaCache;
281     int64_t mResourceID;
282     uint32_t mNext;
283   };
284 
285  protected:
MediaCache(MediaBlockCacheBase * aCache)286   explicit MediaCache(MediaBlockCacheBase* aCache)
287       : mMonitor("MediaCache.mMonitor"),
288         mBlockCache(aCache),
289         mUpdateQueued(false)
290 #ifdef DEBUG
291         ,
292         mInUpdate(false)
293 #endif
294   {
295     NS_ASSERTION(NS_IsMainThread(), "Only construct MediaCache on main thread");
296     MOZ_COUNT_CTOR(MediaCache);
297     MediaCacheFlusher::RegisterMediaCache(this);
298     UpdateOnCellular();
299   }
300 
~MediaCache()301   ~MediaCache() {
302     NS_ASSERTION(NS_IsMainThread(), "Only destroy MediaCache on main thread");
303     if (this == gMediaCache) {
304       LOG("~MediaCache(Global file-backed MediaCache)");
305       // This is the file-backed MediaCache, reset the global pointer.
306       gMediaCache = nullptr;
307     } else {
308       LOG("~MediaCache(Memory-backed MediaCache %p)", this);
309     }
310     MediaCacheFlusher::UnregisterMediaCache(this);
311     NS_ASSERTION(mStreams.IsEmpty(), "Stream(s) still open!");
312     Truncate();
313     NS_ASSERTION(mIndex.Length() == 0, "Blocks leaked?");
314 
315     MOZ_COUNT_DTOR(MediaCache);
316   }
317 
CacheSize()318   static size_t CacheSize() {
319     MOZ_ASSERT(sThread->IsOnCurrentThread());
320     return sOnCellular ? StaticPrefs::media_cache_size_cellular()
321                        : StaticPrefs::media_cache_size();
322   }
323 
ReadaheadLimit()324   static size_t ReadaheadLimit() {
325     MOZ_ASSERT(sThread->IsOnCurrentThread());
326     return sOnCellular ? StaticPrefs::media_cache_readahead_limit_cellular()
327                        : StaticPrefs::media_cache_readahead_limit();
328   }
329 
ResumeThreshold()330   static size_t ResumeThreshold() {
331     return sOnCellular ? StaticPrefs::media_cache_resume_threshold_cellular()
332                        : StaticPrefs::media_cache_resume_threshold();
333   }
334 
335   // Find a free or reusable block and return its index. If there are no
336   // free blocks and no reusable blocks, add a new block to the cache
337   // and return it. Can return -1 on OOM.
338   int32_t FindBlockForIncomingData(AutoLock&, TimeStamp aNow,
339                                    MediaCacheStream* aStream,
340                                    int32_t aStreamBlockIndex);
341   // Find a reusable block --- a free block, if there is one, otherwise
342   // the reusable block with the latest predicted-next-use, or -1 if
343   // there aren't any freeable blocks. Only block indices less than
344   // aMaxSearchBlockIndex are considered. If aForStream is non-null,
345   // then aForStream and aForStreamBlock indicate what media data will
346   // be placed; FindReusableBlock will favour returning free blocks
347   // near other blocks for that point in the stream.
348   int32_t FindReusableBlock(AutoLock&, TimeStamp aNow,
349                             MediaCacheStream* aForStream,
350                             int32_t aForStreamBlock,
351                             int32_t aMaxSearchBlockIndex);
352   bool BlockIsReusable(AutoLock&, int32_t aBlockIndex);
353   // Given a list of blocks sorted with the most reusable blocks at the
354   // end, find the last block whose stream is not pinned (if any)
355   // and whose cache entry index is less than aBlockIndexLimit
356   // and append it to aResult.
357   void AppendMostReusableBlock(AutoLock&, BlockList* aBlockList,
358                                nsTArray<uint32_t>* aResult,
359                                int32_t aBlockIndexLimit);
360 
361   enum BlockClass {
362     // block belongs to mMetadataBlockList because data has been consumed
363     // from it in "metadata mode" --- in particular blocks read during
364     // Ogg seeks go into this class. These blocks may have played data
365     // in them too.
366     METADATA_BLOCK,
367     // block belongs to mPlayedBlockList because its offset is
368     // less than the stream's current reader position
369     PLAYED_BLOCK,
370     // block belongs to the stream's mReadaheadBlockList because its
371     // offset is greater than or equal to the stream's current
372     // reader position
373     READAHEAD_BLOCK
374   };
375 
376   struct BlockOwner {
377     constexpr BlockOwner() = default;
378 
379     // The stream that owns this block, or null if the block is free.
380     MediaCacheStream* mStream = nullptr;
381     // The block index in the stream. Valid only if mStream is non-null.
382     // Initialized to an insane value to highlight misuse.
383     uint32_t mStreamBlock = UINT32_MAX;
384     // Time at which this block was last used. Valid only if
385     // mClass is METADATA_BLOCK or PLAYED_BLOCK.
386     TimeStamp mLastUseTime;
387     BlockClass mClass = READAHEAD_BLOCK;
388   };
389 
390   struct Block {
391     // Free blocks have an empty mOwners array
392     nsTArray<BlockOwner> mOwners;
393   };
394 
395   // Get the BlockList that the block should belong to given its
396   // current owner
397   BlockList* GetListForBlock(AutoLock&, BlockOwner* aBlock);
398   // Get the BlockOwner for the given block index and owning stream
399   // (returns null if the stream does not own the block)
400   BlockOwner* GetBlockOwner(AutoLock&, int32_t aBlockIndex,
401                             MediaCacheStream* aStream);
402   // Returns true iff the block is free
IsBlockFree(int32_t aBlockIndex)403   bool IsBlockFree(int32_t aBlockIndex) {
404     return mIndex[aBlockIndex].mOwners.IsEmpty();
405   }
406   // Add the block to the free list and mark its streams as not having
407   // the block in cache
408   void FreeBlock(AutoLock&, int32_t aBlock);
409   // Mark aStream as not having the block, removing it as an owner. If
410   // the block has no more owners it's added to the free list.
411   void RemoveBlockOwner(AutoLock&, int32_t aBlockIndex,
412                         MediaCacheStream* aStream);
413   // Swap all metadata associated with the two blocks. The caller
414   // is responsible for swapping up any cache file state.
415   void SwapBlocks(AutoLock&, int32_t aBlockIndex1, int32_t aBlockIndex2);
416   // Insert the block into the readahead block list for the stream
417   // at the right point in the list.
418   void InsertReadaheadBlock(AutoLock&, BlockOwner* aBlockOwner,
419                             int32_t aBlockIndex);
420 
421   // Guess the duration until block aBlock will be next used
422   TimeDuration PredictNextUse(AutoLock&, TimeStamp aNow, int32_t aBlock);
423   // Guess the duration until the next incoming data on aStream will be used
424   TimeDuration PredictNextUseForIncomingData(AutoLock&,
425                                              MediaCacheStream* aStream);
426 
427   // Truncate the file and index array if there are free blocks at the
428   // end
429   void Truncate();
430 
431   void FlushInternal(AutoLock&);
432 
433   // There is at most one file-backed media cache.
434   // It is owned by all MediaCacheStreams that use it.
435   // This is a raw pointer set by GetMediaCache(), and reset by ~MediaCache(),
436   // both on the main thread; and is not accessed anywhere else.
437   static inline MediaCache* gMediaCache = nullptr;
438 
439   // This member is main-thread only. It's used to allocate unique
440   // resource IDs to streams.
441   int64_t mNextResourceID = 0;
442 
443   // The monitor protects all the data members here. Also, off-main-thread
444   // readers that need to block will Wait() on this monitor. When new
445   // data becomes available in the cache, we NotifyAll() on this monitor.
446   mozilla::Monitor mMonitor;
447   // This must always be accessed when the monitor is held.
448   nsTArray<MediaCacheStream*> mStreams;
449   // The Blocks describing the cache entries.
450   nsTArray<Block> mIndex;
451 
452   RefPtr<MediaBlockCacheBase> mBlockCache;
453   // The list of free blocks; they are not ordered.
454   BlockList mFreeBlocks;
455   // True if an event to run Update() has been queued but not processed
456   bool mUpdateQueued;
457 #ifdef DEBUG
458   bool mInUpdate;
459 #endif
460   // A list of resource IDs to notify about the change in suspended status.
461   nsTArray<int64_t> mSuspendedStatusToNotify;
462   // The thread on which we will run data callbacks from the channels.
463   // Note this thread is shared among all MediaCache instances.
464   static inline StaticRefPtr<nsIThread> sThread;
465   // True if we've tried to init sThread. Note we try once only so it is safe
466   // to access sThread on all threads.
467   static inline bool sThreadInit = false;
468 
469  private:
470   // MediaCache thread only. True if we're on a cellular network connection.
471   static inline bool sOnCellular = false;
472 
473   // Try to trim the cache back to its desired size, if necessary. Return the
474   // amount of free block counts after trimming.
475   int32_t TrimCacheIfNeeded(AutoLock& aLock, const TimeStamp& aNow);
476 
477   struct StreamAction {
478     enum { NONE, SEEK, RESUME, SUSPEND } mTag = NONE;
479     // Members for 'SEEK' only.
480     bool mResume = false;
481     int64_t mSeekTarget = -1;
482   };
483   // In each update, media cache would determine an action for each stream,
484   // possible actions are: keeping the stream unchanged, seeking to the new
485   // position, resuming its channel or suspending its channel. The action would
486   // be determined by considering a lot of different factors, eg. stream's data
487   // offset and length, how many free or reusable blocks are avaliable, the
488   // predicted time for the next block...e.t.c. This function will write the
489   // corresponding action for each stream in `mStreams` into `aActions`.
490   void DetermineActionsForStreams(AutoLock& aLock, const TimeStamp& aNow,
491                                   nsTArray<StreamAction>& aActions,
492                                   int32_t aFreeBlockCount);
493 
494   // Used by MediaCacheStream::GetDebugInfo() only for debugging.
495   // Don't add new callers to this function.
496   friend void MediaCacheStream::GetDebugInfo(
497       dom::MediaCacheStreamDebugInfo& aInfo);
GetMonitorOnTheMainThread()498   mozilla::Monitor& GetMonitorOnTheMainThread() {
499     MOZ_DIAGNOSTIC_ASSERT(NS_IsMainThread());
500     return mMonitor;
501   }
502 };
503 
UpdateOnCellular()504 void MediaCache::UpdateOnCellular() {
505   NS_ASSERTION(NS_IsMainThread(),
506                "Only call on main thread");  // JNI required on Android...
507   bool onCellular = OnCellularConnection();
508   LOG("MediaCache::UpdateOnCellular() onCellular=%d", onCellular);
509   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
510       "MediaCache::UpdateOnCellular", [=]() { sOnCellular = onCellular; });
511   sThread->Dispatch(r.forget());
512 }
513 
514 NS_IMETHODIMP
Observe(nsISupports * aSubject,char const * aTopic,char16_t const * aData)515 MediaCacheFlusher::Observe(nsISupports* aSubject, char const* aTopic,
516                            char16_t const* aData) {
517   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
518 
519   if (strcmp(aTopic, "last-pb-context-exited") == 0) {
520     for (MediaCache* mc : mMediaCaches) {
521       mc->CloseStreamsForPrivateBrowsing();
522     }
523     return NS_OK;
524   }
525   if (strcmp(aTopic, "cacheservice:empty-cache") == 0) {
526     for (MediaCache* mc : mMediaCaches) {
527       mc->Flush();
528     }
529     return NS_OK;
530   }
531   if (strcmp(aTopic, "contentchild:network-link-type-changed") == 0 ||
532       strcmp(aTopic, NS_NETWORK_LINK_TYPE_TOPIC) == 0) {
533     MediaCache::UpdateOnCellular();
534   }
535   return NS_OK;
536 }
537 
MediaCacheStream(ChannelMediaResource * aClient,bool aIsPrivateBrowsing)538 MediaCacheStream::MediaCacheStream(ChannelMediaResource* aClient,
539                                    bool aIsPrivateBrowsing)
540     : mMediaCache(nullptr),
541       mClient(aClient),
542       mIsTransportSeekable(false),
543       mCacheSuspended(false),
544       mChannelEnded(false),
545       mStreamOffset(0),
546       mPlaybackBytesPerSecond(10000),
547       mPinCount(0),
548       mNotifyDataEndedStatus(NS_ERROR_NOT_INITIALIZED),
549       mIsPrivateBrowsing(aIsPrivateBrowsing) {}
550 
SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const551 size_t MediaCacheStream::SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const {
552   AutoLock lock(mMediaCache->Monitor());
553 
554   // Looks like these are not owned:
555   // - mClient
556   size_t size = mBlocks.ShallowSizeOfExcludingThis(aMallocSizeOf);
557   size += mReadaheadBlocks.SizeOfExcludingThis(aMallocSizeOf);
558   size += mMetadataBlocks.SizeOfExcludingThis(aMallocSizeOf);
559   size += mPlayedBlocks.SizeOfExcludingThis(aMallocSizeOf);
560   size += aMallocSizeOf(mPartialBlockBuffer.get());
561 
562   return size;
563 }
564 
SizeOfExcludingThis(MallocSizeOf aMallocSizeOf) const565 size_t MediaCacheStream::BlockList::SizeOfExcludingThis(
566     MallocSizeOf aMallocSizeOf) const {
567   return mEntries.ShallowSizeOfExcludingThis(aMallocSizeOf);
568 }
569 
AddFirstBlock(int32_t aBlock)570 void MediaCacheStream::BlockList::AddFirstBlock(int32_t aBlock) {
571   NS_ASSERTION(!mEntries.GetEntry(aBlock), "Block already in list");
572   Entry* entry = mEntries.PutEntry(aBlock);
573 
574   if (mFirstBlock < 0) {
575     entry->mNextBlock = entry->mPrevBlock = aBlock;
576   } else {
577     entry->mNextBlock = mFirstBlock;
578     entry->mPrevBlock = mEntries.GetEntry(mFirstBlock)->mPrevBlock;
579     mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = aBlock;
580     mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = aBlock;
581   }
582   mFirstBlock = aBlock;
583   ++mCount;
584 }
585 
AddAfter(int32_t aBlock,int32_t aBefore)586 void MediaCacheStream::BlockList::AddAfter(int32_t aBlock, int32_t aBefore) {
587   NS_ASSERTION(!mEntries.GetEntry(aBlock), "Block already in list");
588   Entry* entry = mEntries.PutEntry(aBlock);
589 
590   Entry* addAfter = mEntries.GetEntry(aBefore);
591   NS_ASSERTION(addAfter, "aBefore not in list");
592 
593   entry->mNextBlock = addAfter->mNextBlock;
594   entry->mPrevBlock = aBefore;
595   mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = aBlock;
596   mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = aBlock;
597   ++mCount;
598 }
599 
RemoveBlock(int32_t aBlock)600 void MediaCacheStream::BlockList::RemoveBlock(int32_t aBlock) {
601   Entry* entry = mEntries.GetEntry(aBlock);
602   MOZ_DIAGNOSTIC_ASSERT(entry, "Block not in list");
603 
604   if (entry->mNextBlock == aBlock) {
605     MOZ_DIAGNOSTIC_ASSERT(entry->mPrevBlock == aBlock,
606                           "Linked list inconsistency");
607     MOZ_DIAGNOSTIC_ASSERT(mFirstBlock == aBlock, "Linked list inconsistency");
608     mFirstBlock = -1;
609   } else {
610     if (mFirstBlock == aBlock) {
611       mFirstBlock = entry->mNextBlock;
612     }
613     mEntries.GetEntry(entry->mNextBlock)->mPrevBlock = entry->mPrevBlock;
614     mEntries.GetEntry(entry->mPrevBlock)->mNextBlock = entry->mNextBlock;
615   }
616   mEntries.RemoveEntry(entry);
617   --mCount;
618 }
619 
GetLastBlock() const620 int32_t MediaCacheStream::BlockList::GetLastBlock() const {
621   if (mFirstBlock < 0) return -1;
622   return mEntries.GetEntry(mFirstBlock)->mPrevBlock;
623 }
624 
GetNextBlock(int32_t aBlock) const625 int32_t MediaCacheStream::BlockList::GetNextBlock(int32_t aBlock) const {
626   int32_t block = mEntries.GetEntry(aBlock)->mNextBlock;
627   if (block == mFirstBlock) return -1;
628   return block;
629 }
630 
GetPrevBlock(int32_t aBlock) const631 int32_t MediaCacheStream::BlockList::GetPrevBlock(int32_t aBlock) const {
632   if (aBlock == mFirstBlock) return -1;
633   return mEntries.GetEntry(aBlock)->mPrevBlock;
634 }
635 
636 #ifdef DEBUG
Verify()637 void MediaCacheStream::BlockList::Verify() {
638   int32_t count = 0;
639   if (mFirstBlock >= 0) {
640     int32_t block = mFirstBlock;
641     do {
642       Entry* entry = mEntries.GetEntry(block);
643       NS_ASSERTION(mEntries.GetEntry(entry->mNextBlock)->mPrevBlock == block,
644                    "Bad prev link");
645       NS_ASSERTION(mEntries.GetEntry(entry->mPrevBlock)->mNextBlock == block,
646                    "Bad next link");
647       block = entry->mNextBlock;
648       ++count;
649     } while (block != mFirstBlock);
650   }
651   NS_ASSERTION(count == mCount, "Bad count");
652 }
653 #endif
654 
UpdateSwappedBlockIndex(int32_t * aBlockIndex,int32_t aBlock1Index,int32_t aBlock2Index)655 static void UpdateSwappedBlockIndex(int32_t* aBlockIndex, int32_t aBlock1Index,
656                                     int32_t aBlock2Index) {
657   int32_t index = *aBlockIndex;
658   if (index == aBlock1Index) {
659     *aBlockIndex = aBlock2Index;
660   } else if (index == aBlock2Index) {
661     *aBlockIndex = aBlock1Index;
662   }
663 }
664 
NotifyBlockSwapped(int32_t aBlockIndex1,int32_t aBlockIndex2)665 void MediaCacheStream::BlockList::NotifyBlockSwapped(int32_t aBlockIndex1,
666                                                      int32_t aBlockIndex2) {
667   Entry* e1 = mEntries.GetEntry(aBlockIndex1);
668   Entry* e2 = mEntries.GetEntry(aBlockIndex2);
669   int32_t e1Prev = -1, e1Next = -1, e2Prev = -1, e2Next = -1;
670 
671   // Fix mFirstBlock
672   UpdateSwappedBlockIndex(&mFirstBlock, aBlockIndex1, aBlockIndex2);
673 
674   // Fix mNextBlock/mPrevBlock links. First capture previous/next links
675   // so we don't get confused due to aliasing.
676   if (e1) {
677     e1Prev = e1->mPrevBlock;
678     e1Next = e1->mNextBlock;
679   }
680   if (e2) {
681     e2Prev = e2->mPrevBlock;
682     e2Next = e2->mNextBlock;
683   }
684   // Update the entries.
685   if (e1) {
686     mEntries.GetEntry(e1Prev)->mNextBlock = aBlockIndex2;
687     mEntries.GetEntry(e1Next)->mPrevBlock = aBlockIndex2;
688   }
689   if (e2) {
690     mEntries.GetEntry(e2Prev)->mNextBlock = aBlockIndex1;
691     mEntries.GetEntry(e2Next)->mPrevBlock = aBlockIndex1;
692   }
693 
694   // Fix hashtable keys. First remove stale entries.
695   if (e1) {
696     e1Prev = e1->mPrevBlock;
697     e1Next = e1->mNextBlock;
698     mEntries.RemoveEntry(e1);
699     // Refresh pointer after hashtable mutation.
700     e2 = mEntries.GetEntry(aBlockIndex2);
701   }
702   if (e2) {
703     e2Prev = e2->mPrevBlock;
704     e2Next = e2->mNextBlock;
705     mEntries.RemoveEntry(e2);
706   }
707   // Put new entries back.
708   if (e1) {
709     e1 = mEntries.PutEntry(aBlockIndex2);
710     e1->mNextBlock = e1Next;
711     e1->mPrevBlock = e1Prev;
712   }
713   if (e2) {
714     e2 = mEntries.PutEntry(aBlockIndex1);
715     e2->mNextBlock = e2Next;
716     e2->mPrevBlock = e2Prev;
717   }
718 }
719 
FlushInternal(AutoLock & aLock)720 void MediaCache::FlushInternal(AutoLock& aLock) {
721   for (uint32_t blockIndex = 0; blockIndex < mIndex.Length(); ++blockIndex) {
722     FreeBlock(aLock, blockIndex);
723   }
724 
725   // Truncate index array.
726   Truncate();
727   NS_ASSERTION(mIndex.Length() == 0, "Blocks leaked?");
728   // Reset block cache to its pristine state.
729   mBlockCache->Flush();
730 }
731 
Flush()732 void MediaCache::Flush() {
733   MOZ_ASSERT(NS_IsMainThread());
734   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
735       "MediaCache::Flush", [self = RefPtr<MediaCache>(this)]() mutable {
736         AutoLock lock(self->mMonitor);
737         self->FlushInternal(lock);
738         // Ensure MediaCache is deleted on the main thread.
739         NS_ReleaseOnMainThread("MediaCache::Flush", self.forget());
740       });
741   sThread->Dispatch(r.forget());
742 }
743 
CloseStreamsForPrivateBrowsing()744 void MediaCache::CloseStreamsForPrivateBrowsing() {
745   MOZ_ASSERT(NS_IsMainThread());
746   sThread->Dispatch(NS_NewRunnableFunction(
747       "MediaCache::CloseStreamsForPrivateBrowsing",
748       [self = RefPtr<MediaCache>(this)]() mutable {
749         AutoLock lock(self->mMonitor);
750         // Copy mStreams since CloseInternal() will change the array.
751         for (MediaCacheStream* s : self->mStreams.Clone()) {
752           if (s->mIsPrivateBrowsing) {
753             s->CloseInternal(lock);
754           }
755         }
756         // Ensure MediaCache is deleted on the main thread.
757         NS_ReleaseOnMainThread("MediaCache::CloseStreamsForPrivateBrowsing",
758                                self.forget());
759       }));
760 }
761 
762 /* static */
GetMediaCache(int64_t aContentLength,bool aIsPrivateBrowsing)763 RefPtr<MediaCache> MediaCache::GetMediaCache(int64_t aContentLength,
764                                              bool aIsPrivateBrowsing) {
765   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
766 
767   if (!sThreadInit) {
768     sThreadInit = true;
769     nsCOMPtr<nsIThread> thread;
770     nsresult rv = NS_NewNamedThread("MediaCache", getter_AddRefs(thread));
771     if (NS_FAILED(rv)) {
772       NS_WARNING("Failed to create a thread for MediaCache.");
773       return nullptr;
774     }
775     sThread = ToRefPtr(std::move(thread));
776 
777     static struct ClearThread {
778       // Called during shutdown to clear sThread.
779       void operator=(std::nullptr_t) {
780         MOZ_ASSERT(sThread, "We should only clear sThread once.");
781         sThread->Shutdown();
782         sThread = nullptr;
783       }
784     } sClearThread;
785     ClearOnShutdown(&sClearThread, ShutdownPhase::XPCOMShutdownThreads);
786   }
787 
788   if (!sThread) {
789     return nullptr;
790   }
791 
792   const int64_t mediaMemoryCacheMaxSize =
793       static_cast<int64_t>(StaticPrefs::media_memory_cache_max_size()) * 1024;
794 
795   // Force usage of in-memory cache if we are in private browsing mode
796   // and the forceMediaMemoryCache pref is set
797   // We will not attempt to create an on-disk cache if this is the case
798   const bool forceMediaMemoryCache =
799       aIsPrivateBrowsing &&
800       StaticPrefs::browser_privatebrowsing_forceMediaMemoryCache();
801 
802   // Alternatively, use an in-memory cache if the media will fit entirely
803   // in memory
804   // aContentLength < 0 indicates we do not know content's actual size
805   const bool contentFitsInMediaMemoryCache =
806       (aContentLength > 0) && (aContentLength <= mediaMemoryCacheMaxSize);
807 
808   // Try to allocate a memory cache for our content
809   if (contentFitsInMediaMemoryCache || forceMediaMemoryCache) {
810     // Figure out how large our cache should be
811     int64_t cacheSize = 0;
812     if (contentFitsInMediaMemoryCache) {
813       cacheSize = aContentLength;
814     } else if (forceMediaMemoryCache) {
815       // Unknown content length, we'll give the maximum allowed cache size
816       // just to be sure.
817       if (aContentLength < 0) {
818         cacheSize = mediaMemoryCacheMaxSize;
819       } else {
820         // If the content length is less than the maximum allowed cache size,
821         // use that, otherwise we cap it to max size.
822         cacheSize = std::min(aContentLength, mediaMemoryCacheMaxSize);
823       }
824     }
825 
826     RefPtr<MediaBlockCacheBase> bc = new MemoryBlockCache(cacheSize);
827     nsresult rv = bc->Init();
828     if (NS_SUCCEEDED(rv)) {
829       RefPtr<MediaCache> mc = new MediaCache(bc);
830       LOG("GetMediaCache(%" PRIi64 ") -> Memory MediaCache %p", aContentLength,
831           mc.get());
832       return mc;
833     }
834 
835     // MemoryBlockCache initialization failed.
836     // If we require use of a memory media cache, we will bail here.
837     // Otherwise use a file-backed MediaCache below.
838     if (forceMediaMemoryCache) {
839       return nullptr;
840     }
841   }
842 
843   if (gMediaCache) {
844     LOG("GetMediaCache(%" PRIi64 ") -> Existing file-backed MediaCache",
845         aContentLength);
846     return gMediaCache;
847   }
848 
849   RefPtr<MediaBlockCacheBase> bc = new FileBlockCache();
850   nsresult rv = bc->Init();
851   if (NS_SUCCEEDED(rv)) {
852     gMediaCache = new MediaCache(bc);
853     LOG("GetMediaCache(%" PRIi64 ") -> Created file-backed MediaCache",
854         aContentLength);
855   } else {
856     LOG("GetMediaCache(%" PRIi64 ") -> Failed to create file-backed MediaCache",
857         aContentLength);
858   }
859 
860   return gMediaCache;
861 }
862 
ReadCacheFile(AutoLock &,int64_t aOffset,void * aData,int32_t aLength,int32_t * aBytes)863 nsresult MediaCache::ReadCacheFile(AutoLock&, int64_t aOffset, void* aData,
864                                    int32_t aLength, int32_t* aBytes) {
865   if (!mBlockCache) {
866     return NS_ERROR_FAILURE;
867   }
868   return mBlockCache->Read(aOffset, reinterpret_cast<uint8_t*>(aData), aLength,
869                            aBytes);
870 }
871 
872 // Allowed range is whatever can be accessed with an int32_t block index.
IsOffsetAllowed(int64_t aOffset)873 static bool IsOffsetAllowed(int64_t aOffset) {
874   return aOffset < (int64_t(INT32_MAX) + 1) * MediaCache::BLOCK_SIZE &&
875          aOffset >= 0;
876 }
877 
878 // Convert 64-bit offset to 32-bit block index.
879 // Assumes offset range-check was already done.
OffsetToBlockIndexUnchecked(int64_t aOffset)880 static int32_t OffsetToBlockIndexUnchecked(int64_t aOffset) {
881   // Still check for allowed range in debug builds, to catch out-of-range
882   // issues early during development.
883   MOZ_ASSERT(IsOffsetAllowed(aOffset));
884   return int32_t(aOffset / MediaCache::BLOCK_SIZE);
885 }
886 
887 // Convert 64-bit offset to 32-bit block index. -1 if out of allowed range.
OffsetToBlockIndex(int64_t aOffset)888 static int32_t OffsetToBlockIndex(int64_t aOffset) {
889   return IsOffsetAllowed(aOffset) ? OffsetToBlockIndexUnchecked(aOffset) : -1;
890 }
891 
892 // Convert 64-bit offset to 32-bit offset inside a block.
893 // Will not fail (even if offset is outside allowed range), so there is no
894 // need to check for errors.
OffsetInBlock(int64_t aOffset)895 static int32_t OffsetInBlock(int64_t aOffset) {
896   // Still check for allowed range in debug builds, to catch out-of-range
897   // issues early during development.
898   MOZ_ASSERT(IsOffsetAllowed(aOffset));
899   return int32_t(aOffset % MediaCache::BLOCK_SIZE);
900 }
901 
FindBlockForIncomingData(AutoLock & aLock,TimeStamp aNow,MediaCacheStream * aStream,int32_t aStreamBlockIndex)902 int32_t MediaCache::FindBlockForIncomingData(AutoLock& aLock, TimeStamp aNow,
903                                              MediaCacheStream* aStream,
904                                              int32_t aStreamBlockIndex) {
905   MOZ_ASSERT(sThread->IsOnCurrentThread());
906 
907   int32_t blockIndex =
908       FindReusableBlock(aLock, aNow, aStream, aStreamBlockIndex, INT32_MAX);
909 
910   if (blockIndex < 0 || !IsBlockFree(blockIndex)) {
911     // The block returned is already allocated.
912     // Don't reuse it if a) there's room to expand the cache or
913     // b) the data we're going to store in the free block is not higher
914     // priority than the data already stored in the free block.
915     // The latter can lead us to go over the cache limit a bit.
916     if ((mIndex.Length() <
917              uint32_t(mBlockCache->GetMaxBlocks(MediaCache::CacheSize())) ||
918          blockIndex < 0 ||
919          PredictNextUseForIncomingData(aLock, aStream) >=
920              PredictNextUse(aLock, aNow, blockIndex))) {
921       blockIndex = mIndex.Length();
922       // XXX(Bug 1631371) Check if this should use a fallible operation as it
923       // pretended earlier.
924       mIndex.AppendElement();
925       mFreeBlocks.AddFirstBlock(blockIndex);
926       return blockIndex;
927     }
928   }
929 
930   return blockIndex;
931 }
932 
BlockIsReusable(AutoLock &,int32_t aBlockIndex)933 bool MediaCache::BlockIsReusable(AutoLock&, int32_t aBlockIndex) {
934   Block* block = &mIndex[aBlockIndex];
935   for (uint32_t i = 0; i < block->mOwners.Length(); ++i) {
936     MediaCacheStream* stream = block->mOwners[i].mStream;
937     if (stream->mPinCount > 0 ||
938         uint32_t(OffsetToBlockIndex(stream->mStreamOffset)) ==
939             block->mOwners[i].mStreamBlock) {
940       return false;
941     }
942   }
943   return true;
944 }
945 
AppendMostReusableBlock(AutoLock & aLock,BlockList * aBlockList,nsTArray<uint32_t> * aResult,int32_t aBlockIndexLimit)946 void MediaCache::AppendMostReusableBlock(AutoLock& aLock, BlockList* aBlockList,
947                                          nsTArray<uint32_t>* aResult,
948                                          int32_t aBlockIndexLimit) {
949   int32_t blockIndex = aBlockList->GetLastBlock();
950   if (blockIndex < 0) return;
951   do {
952     // Don't consider blocks for pinned streams, or blocks that are
953     // beyond the specified limit, or a block that contains a stream's
954     // current read position (such a block contains both played data
955     // and readahead data)
956     if (blockIndex < aBlockIndexLimit && BlockIsReusable(aLock, blockIndex)) {
957       aResult->AppendElement(blockIndex);
958       return;
959     }
960     blockIndex = aBlockList->GetPrevBlock(blockIndex);
961   } while (blockIndex >= 0);
962 }
963 
FindReusableBlock(AutoLock & aLock,TimeStamp aNow,MediaCacheStream * aForStream,int32_t aForStreamBlock,int32_t aMaxSearchBlockIndex)964 int32_t MediaCache::FindReusableBlock(AutoLock& aLock, TimeStamp aNow,
965                                       MediaCacheStream* aForStream,
966                                       int32_t aForStreamBlock,
967                                       int32_t aMaxSearchBlockIndex) {
968   MOZ_ASSERT(sThread->IsOnCurrentThread());
969 
970   uint32_t length =
971       std::min(uint32_t(aMaxSearchBlockIndex), uint32_t(mIndex.Length()));
972 
973   if (aForStream && aForStreamBlock > 0 &&
974       uint32_t(aForStreamBlock) <= aForStream->mBlocks.Length()) {
975     int32_t prevCacheBlock = aForStream->mBlocks[aForStreamBlock - 1];
976     if (prevCacheBlock >= 0) {
977       uint32_t freeBlockScanEnd =
978           std::min(length, prevCacheBlock + FREE_BLOCK_SCAN_LIMIT);
979       for (uint32_t i = prevCacheBlock; i < freeBlockScanEnd; ++i) {
980         if (IsBlockFree(i)) return i;
981       }
982     }
983   }
984 
985   if (!mFreeBlocks.IsEmpty()) {
986     int32_t blockIndex = mFreeBlocks.GetFirstBlock();
987     do {
988       if (blockIndex < aMaxSearchBlockIndex) return blockIndex;
989       blockIndex = mFreeBlocks.GetNextBlock(blockIndex);
990     } while (blockIndex >= 0);
991   }
992 
993   // Build a list of the blocks we should consider for the "latest
994   // predicted time of next use". We can exploit the fact that the block
995   // linked lists are ordered by increasing time of next use. This is
996   // actually the whole point of having the linked lists.
997   AutoTArray<uint32_t, 8> candidates;
998   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
999     MediaCacheStream* stream = mStreams[i];
1000     if (stream->mPinCount > 0) {
1001       // No point in even looking at this stream's blocks
1002       continue;
1003     }
1004 
1005     AppendMostReusableBlock(aLock, &stream->mMetadataBlocks, &candidates,
1006                             length);
1007     AppendMostReusableBlock(aLock, &stream->mPlayedBlocks, &candidates, length);
1008 
1009     // Don't consider readahead blocks in non-seekable streams. If we
1010     // remove the block we won't be able to seek back to read it later.
1011     if (stream->mIsTransportSeekable) {
1012       AppendMostReusableBlock(aLock, &stream->mReadaheadBlocks, &candidates,
1013                               length);
1014     }
1015   }
1016 
1017   TimeDuration latestUse;
1018   int32_t latestUseBlock = -1;
1019   for (uint32_t i = 0; i < candidates.Length(); ++i) {
1020     TimeDuration nextUse = PredictNextUse(aLock, aNow, candidates[i]);
1021     if (nextUse > latestUse) {
1022       latestUse = nextUse;
1023       latestUseBlock = candidates[i];
1024     }
1025   }
1026 
1027   return latestUseBlock;
1028 }
1029 
GetListForBlock(AutoLock &,BlockOwner * aBlock)1030 MediaCache::BlockList* MediaCache::GetListForBlock(AutoLock&,
1031                                                    BlockOwner* aBlock) {
1032   switch (aBlock->mClass) {
1033     case METADATA_BLOCK:
1034       NS_ASSERTION(aBlock->mStream, "Metadata block has no stream?");
1035       return &aBlock->mStream->mMetadataBlocks;
1036     case PLAYED_BLOCK:
1037       NS_ASSERTION(aBlock->mStream, "Metadata block has no stream?");
1038       return &aBlock->mStream->mPlayedBlocks;
1039     case READAHEAD_BLOCK:
1040       NS_ASSERTION(aBlock->mStream, "Readahead block has no stream?");
1041       return &aBlock->mStream->mReadaheadBlocks;
1042     default:
1043       NS_ERROR("Invalid block class");
1044       return nullptr;
1045   }
1046 }
1047 
GetBlockOwner(AutoLock &,int32_t aBlockIndex,MediaCacheStream * aStream)1048 MediaCache::BlockOwner* MediaCache::GetBlockOwner(AutoLock&,
1049                                                   int32_t aBlockIndex,
1050                                                   MediaCacheStream* aStream) {
1051   Block* block = &mIndex[aBlockIndex];
1052   for (uint32_t i = 0; i < block->mOwners.Length(); ++i) {
1053     if (block->mOwners[i].mStream == aStream) return &block->mOwners[i];
1054   }
1055   return nullptr;
1056 }
1057 
SwapBlocks(AutoLock & aLock,int32_t aBlockIndex1,int32_t aBlockIndex2)1058 void MediaCache::SwapBlocks(AutoLock& aLock, int32_t aBlockIndex1,
1059                             int32_t aBlockIndex2) {
1060   Block* block1 = &mIndex[aBlockIndex1];
1061   Block* block2 = &mIndex[aBlockIndex2];
1062 
1063   block1->mOwners.SwapElements(block2->mOwners);
1064 
1065   // Now all references to block1 have to be replaced with block2 and
1066   // vice versa.
1067   // First update stream references to blocks via mBlocks.
1068   const Block* blocks[] = {block1, block2};
1069   int32_t blockIndices[] = {aBlockIndex1, aBlockIndex2};
1070   for (int32_t i = 0; i < 2; ++i) {
1071     for (uint32_t j = 0; j < blocks[i]->mOwners.Length(); ++j) {
1072       const BlockOwner* b = &blocks[i]->mOwners[j];
1073       b->mStream->mBlocks[b->mStreamBlock] = blockIndices[i];
1074     }
1075   }
1076 
1077   // Now update references to blocks in block lists.
1078   mFreeBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2);
1079 
1080   nsTHashSet<MediaCacheStream*> visitedStreams;
1081 
1082   for (int32_t i = 0; i < 2; ++i) {
1083     for (uint32_t j = 0; j < blocks[i]->mOwners.Length(); ++j) {
1084       MediaCacheStream* stream = blocks[i]->mOwners[j].mStream;
1085       // Make sure that we don't update the same stream twice --- that
1086       // would result in swapping the block references back again!
1087       if (!visitedStreams.EnsureInserted(stream)) continue;
1088       stream->mReadaheadBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2);
1089       stream->mPlayedBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2);
1090       stream->mMetadataBlocks.NotifyBlockSwapped(aBlockIndex1, aBlockIndex2);
1091     }
1092   }
1093 
1094   Verify(aLock);
1095 }
1096 
RemoveBlockOwner(AutoLock & aLock,int32_t aBlockIndex,MediaCacheStream * aStream)1097 void MediaCache::RemoveBlockOwner(AutoLock& aLock, int32_t aBlockIndex,
1098                                   MediaCacheStream* aStream) {
1099   Block* block = &mIndex[aBlockIndex];
1100   for (uint32_t i = 0; i < block->mOwners.Length(); ++i) {
1101     BlockOwner* bo = &block->mOwners[i];
1102     if (bo->mStream == aStream) {
1103       GetListForBlock(aLock, bo)->RemoveBlock(aBlockIndex);
1104       bo->mStream->mBlocks[bo->mStreamBlock] = -1;
1105       block->mOwners.RemoveElementAt(i);
1106       if (block->mOwners.IsEmpty()) {
1107         mFreeBlocks.AddFirstBlock(aBlockIndex);
1108       }
1109       return;
1110     }
1111   }
1112 }
1113 
AddBlockOwnerAsReadahead(AutoLock & aLock,int32_t aBlockIndex,MediaCacheStream * aStream,int32_t aStreamBlockIndex)1114 void MediaCache::AddBlockOwnerAsReadahead(AutoLock& aLock, int32_t aBlockIndex,
1115                                           MediaCacheStream* aStream,
1116                                           int32_t aStreamBlockIndex) {
1117   Block* block = &mIndex[aBlockIndex];
1118   if (block->mOwners.IsEmpty()) {
1119     mFreeBlocks.RemoveBlock(aBlockIndex);
1120   }
1121   BlockOwner* bo = block->mOwners.AppendElement();
1122   bo->mStream = aStream;
1123   bo->mStreamBlock = aStreamBlockIndex;
1124   aStream->mBlocks[aStreamBlockIndex] = aBlockIndex;
1125   bo->mClass = READAHEAD_BLOCK;
1126   InsertReadaheadBlock(aLock, bo, aBlockIndex);
1127 }
1128 
FreeBlock(AutoLock & aLock,int32_t aBlock)1129 void MediaCache::FreeBlock(AutoLock& aLock, int32_t aBlock) {
1130   Block* block = &mIndex[aBlock];
1131   if (block->mOwners.IsEmpty()) {
1132     // already free
1133     return;
1134   }
1135 
1136   LOG("Released block %d", aBlock);
1137 
1138   for (uint32_t i = 0; i < block->mOwners.Length(); ++i) {
1139     BlockOwner* bo = &block->mOwners[i];
1140     GetListForBlock(aLock, bo)->RemoveBlock(aBlock);
1141     bo->mStream->mBlocks[bo->mStreamBlock] = -1;
1142   }
1143   block->mOwners.Clear();
1144   mFreeBlocks.AddFirstBlock(aBlock);
1145   Verify(aLock);
1146 }
1147 
PredictNextUse(AutoLock &,TimeStamp aNow,int32_t aBlock)1148 TimeDuration MediaCache::PredictNextUse(AutoLock&, TimeStamp aNow,
1149                                         int32_t aBlock) {
1150   MOZ_ASSERT(sThread->IsOnCurrentThread());
1151   NS_ASSERTION(!IsBlockFree(aBlock), "aBlock is free");
1152 
1153   Block* block = &mIndex[aBlock];
1154   // Blocks can be belong to multiple streams. The predicted next use
1155   // time is the earliest time predicted by any of the streams.
1156   TimeDuration result;
1157   for (uint32_t i = 0; i < block->mOwners.Length(); ++i) {
1158     BlockOwner* bo = &block->mOwners[i];
1159     TimeDuration prediction;
1160     switch (bo->mClass) {
1161       case METADATA_BLOCK:
1162         // This block should be managed in LRU mode. For metadata we predict
1163         // that the time until the next use is the time since the last use.
1164         prediction = aNow - bo->mLastUseTime;
1165         break;
1166       case PLAYED_BLOCK: {
1167         // This block should be managed in LRU mode, and we should impose
1168         // a "replay delay" to reflect the likelihood of replay happening
1169         NS_ASSERTION(static_cast<int64_t>(bo->mStreamBlock) * BLOCK_SIZE <
1170                          bo->mStream->mStreamOffset,
1171                      "Played block after the current stream position?");
1172         int64_t bytesBehind =
1173             bo->mStream->mStreamOffset -
1174             static_cast<int64_t>(bo->mStreamBlock) * BLOCK_SIZE;
1175         int64_t millisecondsBehind =
1176             bytesBehind * 1000 / bo->mStream->mPlaybackBytesPerSecond;
1177         prediction = TimeDuration::FromMilliseconds(std::min<int64_t>(
1178             millisecondsBehind * REPLAY_PENALTY_FACTOR, INT32_MAX));
1179         break;
1180       }
1181       case READAHEAD_BLOCK: {
1182         int64_t bytesAhead =
1183             static_cast<int64_t>(bo->mStreamBlock) * BLOCK_SIZE -
1184             bo->mStream->mStreamOffset;
1185         NS_ASSERTION(bytesAhead >= 0,
1186                      "Readahead block before the current stream position?");
1187         int64_t millisecondsAhead =
1188             bytesAhead * 1000 / bo->mStream->mPlaybackBytesPerSecond;
1189         prediction = TimeDuration::FromMilliseconds(
1190             std::min<int64_t>(millisecondsAhead, INT32_MAX));
1191         break;
1192       }
1193       default:
1194         NS_ERROR("Invalid class for predicting next use");
1195         return TimeDuration(0);
1196     }
1197     if (i == 0 || prediction < result) {
1198       result = prediction;
1199     }
1200   }
1201   return result;
1202 }
1203 
PredictNextUseForIncomingData(AutoLock &,MediaCacheStream * aStream)1204 TimeDuration MediaCache::PredictNextUseForIncomingData(
1205     AutoLock&, MediaCacheStream* aStream) {
1206   MOZ_ASSERT(sThread->IsOnCurrentThread());
1207 
1208   int64_t bytesAhead = aStream->mChannelOffset - aStream->mStreamOffset;
1209   if (bytesAhead <= -BLOCK_SIZE) {
1210     // Hmm, no idea when data behind us will be used. Guess 24 hours.
1211     return TimeDuration::FromSeconds(24 * 60 * 60);
1212   }
1213   if (bytesAhead <= 0) return TimeDuration(0);
1214   int64_t millisecondsAhead =
1215       bytesAhead * 1000 / aStream->mPlaybackBytesPerSecond;
1216   return TimeDuration::FromMilliseconds(
1217       std::min<int64_t>(millisecondsAhead, INT32_MAX));
1218 }
1219 
Update()1220 void MediaCache::Update() {
1221   MOZ_ASSERT(sThread->IsOnCurrentThread());
1222 
1223   AutoLock lock(mMonitor);
1224 
1225   mUpdateQueued = false;
1226 #ifdef DEBUG
1227   mInUpdate = true;
1228 #endif
1229   const TimeStamp now = TimeStamp::Now();
1230   const int32_t freeBlockCount = TrimCacheIfNeeded(lock, now);
1231 
1232   // The action to use for each stream. We store these so we can make
1233   // decisions while holding the cache lock but implement those decisions
1234   // without holding the cache lock, since we need to call out to
1235   // stream, decoder and element code.
1236   AutoTArray<StreamAction, 10> actions;
1237   DetermineActionsForStreams(lock, now, actions, freeBlockCount);
1238 
1239 #ifdef DEBUG
1240   mInUpdate = false;
1241 #endif
1242 
1243   // First, update the mCacheSuspended/mCacheEnded flags so that they're all
1244   // correct when we fire our CacheClient commands below. Those commands can
1245   // rely on these flags being set correctly for all streams.
1246   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1247     MediaCacheStream* stream = mStreams[i];
1248     switch (actions[i].mTag) {
1249       case StreamAction::SEEK:
1250         stream->mCacheSuspended = false;
1251         stream->mChannelEnded = false;
1252         break;
1253       case StreamAction::RESUME:
1254         stream->mCacheSuspended = false;
1255         break;
1256       case StreamAction::SUSPEND:
1257         stream->mCacheSuspended = true;
1258         break;
1259       default:
1260         break;
1261     }
1262   }
1263 
1264   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1265     MediaCacheStream* stream = mStreams[i];
1266     switch (actions[i].mTag) {
1267       case StreamAction::SEEK:
1268         LOG("Stream %p CacheSeek to %" PRId64 " (resume=%d)", stream,
1269             actions[i].mSeekTarget, actions[i].mResume);
1270         stream->mClient->CacheClientSeek(actions[i].mSeekTarget,
1271                                          actions[i].mResume);
1272         break;
1273       case StreamAction::RESUME:
1274         LOG("Stream %p Resumed", stream);
1275         stream->mClient->CacheClientResume();
1276         QueueSuspendedStatusUpdate(lock, stream->mResourceID);
1277         break;
1278       case StreamAction::SUSPEND:
1279         LOG("Stream %p Suspended", stream);
1280         stream->mClient->CacheClientSuspend();
1281         QueueSuspendedStatusUpdate(lock, stream->mResourceID);
1282         break;
1283       default:
1284         break;
1285     }
1286   }
1287 
1288   // Notify streams about the suspended status changes.
1289   for (uint32_t i = 0; i < mSuspendedStatusToNotify.Length(); ++i) {
1290     MediaCache::ResourceStreamIterator iter(this, mSuspendedStatusToNotify[i]);
1291     while (MediaCacheStream* stream = iter.Next(lock)) {
1292       stream->mClient->CacheClientNotifySuspendedStatusChanged(
1293           stream->AreAllStreamsForResourceSuspended(lock));
1294     }
1295   }
1296   mSuspendedStatusToNotify.Clear();
1297 }
1298 
TrimCacheIfNeeded(AutoLock & aLock,const TimeStamp & aNow)1299 int32_t MediaCache::TrimCacheIfNeeded(AutoLock& aLock, const TimeStamp& aNow) {
1300   MOZ_ASSERT(sThread->IsOnCurrentThread());
1301 
1302   const int32_t maxBlocks = mBlockCache->GetMaxBlocks(MediaCache::CacheSize());
1303 
1304   int32_t freeBlockCount = mFreeBlocks.GetCount();
1305   TimeDuration latestPredictedUseForOverflow = 0;
1306   if (mIndex.Length() > uint32_t(maxBlocks)) {
1307     // Try to trim back the cache to its desired maximum size. The cache may
1308     // have overflowed simply due to data being received when we have
1309     // no blocks in the main part of the cache that are free or lower
1310     // priority than the new data. The cache can also be overflowing because
1311     // the media.cache_size preference was reduced.
1312     // First, figure out what the least valuable block in the cache overflow
1313     // is. We don't want to replace any blocks in the main part of the
1314     // cache whose expected time of next use is earlier or equal to that.
1315     // If we allow that, we can effectively end up discarding overflowing
1316     // blocks (by moving an overflowing block to the main part of the cache,
1317     // and then overwriting it with another overflowing block), and we try
1318     // to avoid that since it requires HTTP seeks.
1319     // We also use this loop to eliminate overflowing blocks from
1320     // freeBlockCount.
1321     for (int32_t blockIndex = mIndex.Length() - 1; blockIndex >= maxBlocks;
1322          --blockIndex) {
1323       if (IsBlockFree(blockIndex)) {
1324         // Don't count overflowing free blocks in our free block count
1325         --freeBlockCount;
1326         continue;
1327       }
1328       TimeDuration predictedUse = PredictNextUse(aLock, aNow, blockIndex);
1329       latestPredictedUseForOverflow =
1330           std::max(latestPredictedUseForOverflow, predictedUse);
1331     }
1332   } else {
1333     freeBlockCount += maxBlocks - mIndex.Length();
1334   }
1335 
1336   // Now try to move overflowing blocks to the main part of the cache.
1337   for (int32_t blockIndex = mIndex.Length() - 1; blockIndex >= maxBlocks;
1338        --blockIndex) {
1339     if (IsBlockFree(blockIndex)) continue;
1340 
1341     Block* block = &mIndex[blockIndex];
1342     // Try to relocate the block close to other blocks for the first stream.
1343     // There is no point in trying to make it close to other blocks in
1344     // *all* the streams it might belong to.
1345     int32_t destinationBlockIndex =
1346         FindReusableBlock(aLock, aNow, block->mOwners[0].mStream,
1347                           block->mOwners[0].mStreamBlock, maxBlocks);
1348     if (destinationBlockIndex < 0) {
1349       // Nowhere to place this overflow block. We won't be able to
1350       // place any more overflow blocks.
1351       break;
1352     }
1353 
1354     // Don't evict |destinationBlockIndex| if it is within [cur, end) otherwise
1355     // a new channel will be opened to download this block again which is bad.
1356     bool inCurrentCachedRange = false;
1357     for (BlockOwner& owner : mIndex[destinationBlockIndex].mOwners) {
1358       MediaCacheStream* stream = owner.mStream;
1359       int64_t end = OffsetToBlockIndexUnchecked(
1360           stream->GetCachedDataEndInternal(aLock, stream->mStreamOffset));
1361       int64_t cur = OffsetToBlockIndexUnchecked(stream->mStreamOffset);
1362       if (cur <= owner.mStreamBlock && owner.mStreamBlock < end) {
1363         inCurrentCachedRange = true;
1364         break;
1365       }
1366     }
1367     if (inCurrentCachedRange) {
1368       continue;
1369     }
1370 
1371     if (IsBlockFree(destinationBlockIndex) ||
1372         PredictNextUse(aLock, aNow, destinationBlockIndex) >
1373             latestPredictedUseForOverflow) {
1374       // Reuse blocks in the main part of the cache that are less useful than
1375       // the least useful overflow blocks
1376 
1377       nsresult rv = mBlockCache->MoveBlock(blockIndex, destinationBlockIndex);
1378 
1379       if (NS_SUCCEEDED(rv)) {
1380         // We successfully copied the file data.
1381         LOG("Swapping blocks %d and %d (trimming cache)", blockIndex,
1382             destinationBlockIndex);
1383         // Swapping the block metadata here lets us maintain the
1384         // correct positions in the linked lists
1385         SwapBlocks(aLock, blockIndex, destinationBlockIndex);
1386         // Free the overflowing block even if the copy failed.
1387         LOG("Released block %d (trimming cache)", blockIndex);
1388         FreeBlock(aLock, blockIndex);
1389       }
1390     } else {
1391       LOG("Could not trim cache block %d (destination %d, "
1392           "predicted next use %f, latest predicted use for overflow %f",
1393           blockIndex, destinationBlockIndex,
1394           PredictNextUse(aLock, aNow, destinationBlockIndex).ToSeconds(),
1395           latestPredictedUseForOverflow.ToSeconds());
1396     }
1397   }
1398   // Try chopping back the array of cache entries and the cache file.
1399   Truncate();
1400   return freeBlockCount;
1401 }
1402 
DetermineActionsForStreams(AutoLock & aLock,const TimeStamp & aNow,nsTArray<StreamAction> & aActions,int32_t aFreeBlockCount)1403 void MediaCache::DetermineActionsForStreams(AutoLock& aLock,
1404                                             const TimeStamp& aNow,
1405                                             nsTArray<StreamAction>& aActions,
1406                                             int32_t aFreeBlockCount) {
1407   MOZ_ASSERT(sThread->IsOnCurrentThread());
1408 
1409   // Count the blocks allocated for readahead of non-seekable streams
1410   // (these blocks can't be freed but we don't want them to monopolize the
1411   // cache)
1412   int32_t nonSeekableReadaheadBlockCount = 0;
1413   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1414     MediaCacheStream* stream = mStreams[i];
1415     if (!stream->mIsTransportSeekable) {
1416       nonSeekableReadaheadBlockCount += stream->mReadaheadBlocks.GetCount();
1417     }
1418   }
1419 
1420   // If freeBlockCount is zero, then compute the latest of
1421   // the predicted next-uses for all blocks
1422   TimeDuration latestNextUse;
1423   const int32_t maxBlocks = mBlockCache->GetMaxBlocks(MediaCache::CacheSize());
1424   if (aFreeBlockCount == 0) {
1425     const int32_t reusableBlock =
1426         FindReusableBlock(aLock, aNow, nullptr, 0, maxBlocks);
1427     if (reusableBlock >= 0) {
1428       latestNextUse = PredictNextUse(aLock, aNow, reusableBlock);
1429     }
1430   }
1431 
1432   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1433     aActions.AppendElement(StreamAction{});
1434 
1435     MediaCacheStream* stream = mStreams[i];
1436     if (stream->mClosed) {
1437       LOG("Stream %p closed", stream);
1438       continue;
1439     }
1440 
1441     // We make decisions based on mSeekTarget when there is a pending seek.
1442     // Otherwise we will keep issuing seek requests until mChannelOffset
1443     // is changed by NotifyDataStarted() which is bad.
1444     const int64_t channelOffset = stream->mSeekTarget != -1
1445                                       ? stream->mSeekTarget
1446                                       : stream->mChannelOffset;
1447 
1448     // Figure out where we should be reading from. It's the first
1449     // uncached byte after the current mStreamOffset.
1450     const int64_t dataOffset =
1451         stream->GetCachedDataEndInternal(aLock, stream->mStreamOffset);
1452     MOZ_ASSERT(dataOffset >= 0);
1453 
1454     // Compute where we'd actually seek to to read at readOffset
1455     int64_t desiredOffset = dataOffset;
1456     if (stream->mIsTransportSeekable) {
1457       if (desiredOffset > channelOffset &&
1458           desiredOffset <= channelOffset + SEEK_VS_READ_THRESHOLD) {
1459         // Assume it's more efficient to just keep reading up to the
1460         // desired position instead of trying to seek
1461         desiredOffset = channelOffset;
1462       }
1463     } else {
1464       // We can't seek directly to the desired offset...
1465       if (channelOffset > desiredOffset) {
1466         // Reading forward won't get us anywhere, we need to go backwards.
1467         // Seek back to 0 (the client will reopen the stream) and then
1468         // read forward.
1469         NS_WARNING("Can't seek backwards, so seeking to 0");
1470         desiredOffset = 0;
1471         // Flush cached blocks out, since if this is a live stream
1472         // the cached data may be completely different next time we
1473         // read it. We have to assume that live streams don't
1474         // advertise themselves as being seekable...
1475         ReleaseStreamBlocks(aLock, stream);
1476       } else {
1477         // otherwise reading forward is looking good, so just stay where we
1478         // are and don't trigger a channel seek!
1479         desiredOffset = channelOffset;
1480       }
1481     }
1482 
1483     // Figure out if we should be reading data now or not. It's amazing
1484     // how complex this is, but each decision is simple enough.
1485     bool enableReading;
1486     if (stream->mStreamLength >= 0 && dataOffset >= stream->mStreamLength) {
1487       // We want data at the end of the stream, where there's nothing to
1488       // read. We don't want to try to read if we're suspended, because that
1489       // might create a new channel and seek unnecessarily (and incorrectly,
1490       // since HTTP doesn't allow seeking to the actual EOF), and we don't want
1491       // to suspend if we're not suspended and already reading at the end of
1492       // the stream, since there just might be more data than the server
1493       // advertised with Content-Length, and we may as well keep reading.
1494       // But we don't want to seek to the end of the stream if we're not
1495       // already there.
1496       LOG("Stream %p at end of stream", stream);
1497       enableReading =
1498           !stream->mCacheSuspended && stream->mStreamLength == channelOffset;
1499     } else if (desiredOffset < stream->mStreamOffset) {
1500       // We're reading to try to catch up to where the current stream
1501       // reader wants to be. Better not stop.
1502       LOG("Stream %p catching up", stream);
1503       enableReading = true;
1504     } else if (desiredOffset < stream->mStreamOffset + BLOCK_SIZE) {
1505       // The stream reader is waiting for us, or nearly so. Better feed it.
1506       LOG("Stream %p feeding reader", stream);
1507       enableReading = true;
1508     } else if (!stream->mIsTransportSeekable &&
1509                nonSeekableReadaheadBlockCount >=
1510                    maxBlocks * NONSEEKABLE_READAHEAD_MAX) {
1511       // This stream is not seekable and there are already too many blocks
1512       // being cached for readahead for nonseekable streams (which we can't
1513       // free). So stop reading ahead now.
1514       LOG("Stream %p throttling non-seekable readahead", stream);
1515       enableReading = false;
1516     } else if (mIndex.Length() > uint32_t(maxBlocks)) {
1517       // We're in the process of bringing the cache size back to the
1518       // desired limit, so don't bring in more data yet
1519       LOG("Stream %p throttling to reduce cache size", stream);
1520       enableReading = false;
1521     } else {
1522       TimeDuration predictedNewDataUse =
1523           PredictNextUseForIncomingData(aLock, stream);
1524 
1525       if (stream->mThrottleReadahead && stream->mCacheSuspended &&
1526           predictedNewDataUse.ToSeconds() > MediaCache::ResumeThreshold()) {
1527         // Don't need data for a while, so don't bother waking up the stream
1528         LOG("Stream %p avoiding wakeup since more data is not needed", stream);
1529         enableReading = false;
1530       } else if (stream->mThrottleReadahead &&
1531                  predictedNewDataUse.ToSeconds() >
1532                      MediaCache::ReadaheadLimit()) {
1533         // Don't read ahead more than this much
1534         LOG("Stream %p throttling to avoid reading ahead too far", stream);
1535         enableReading = false;
1536       } else if (aFreeBlockCount > 0) {
1537         // Free blocks in the cache, so keep reading
1538         LOG("Stream %p reading since there are free blocks", stream);
1539         enableReading = true;
1540       } else if (latestNextUse <= TimeDuration(0)) {
1541         // No reusable blocks, so can't read anything
1542         LOG("Stream %p throttling due to no reusable blocks", stream);
1543         enableReading = false;
1544       } else {
1545         // Read ahead if the data we expect to read is more valuable than
1546         // the least valuable block in the main part of the cache
1547         LOG("Stream %p predict next data in %f, current worst block is %f",
1548             stream, predictedNewDataUse.ToSeconds(), latestNextUse.ToSeconds());
1549         enableReading = predictedNewDataUse < latestNextUse;
1550       }
1551     }
1552 
1553     if (enableReading) {
1554       for (uint32_t j = 0; j < i; ++j) {
1555         MediaCacheStream* other = mStreams[j];
1556         if (other->mResourceID == stream->mResourceID && !other->mClosed &&
1557             !other->mClientSuspended && !other->mChannelEnded &&
1558             OffsetToBlockIndexUnchecked(other->mSeekTarget != -1
1559                                             ? other->mSeekTarget
1560                                             : other->mChannelOffset) ==
1561                 OffsetToBlockIndexUnchecked(desiredOffset)) {
1562           // This block is already going to be read by the other stream.
1563           // So don't try to read it from this stream as well.
1564           enableReading = false;
1565           LOG("Stream %p waiting on same block (%" PRId32 ") from stream %p",
1566               stream, OffsetToBlockIndexUnchecked(desiredOffset), other);
1567           break;
1568         }
1569       }
1570     }
1571 
1572     if (channelOffset != desiredOffset && enableReading) {
1573       // We need to seek now.
1574       NS_ASSERTION(stream->mIsTransportSeekable || desiredOffset == 0,
1575                    "Trying to seek in a non-seekable stream!");
1576       // Round seek offset down to the start of the block. This is essential
1577       // because we don't want to think we have part of a block already
1578       // in mPartialBlockBuffer.
1579       stream->mSeekTarget =
1580           OffsetToBlockIndexUnchecked(desiredOffset) * BLOCK_SIZE;
1581       aActions[i].mTag = StreamAction::SEEK;
1582       aActions[i].mResume = stream->mCacheSuspended;
1583       aActions[i].mSeekTarget = stream->mSeekTarget;
1584     } else if (enableReading && stream->mCacheSuspended) {
1585       aActions[i].mTag = StreamAction::RESUME;
1586     } else if (!enableReading && !stream->mCacheSuspended) {
1587       aActions[i].mTag = StreamAction::SUSPEND;
1588     }
1589     LOG("Stream %p, mCacheSuspended=%d, enableReading=%d, action=%s", stream,
1590         stream->mCacheSuspended, enableReading,
1591         aActions[i].mTag == StreamAction::SEEK      ? "SEEK"
1592         : aActions[i].mTag == StreamAction::RESUME  ? "RESUME"
1593         : aActions[i].mTag == StreamAction::SUSPEND ? "SUSPEND"
1594                                                     : "NONE");
1595   }
1596 }
1597 
QueueUpdate(AutoLock &)1598 void MediaCache::QueueUpdate(AutoLock&) {
1599   // Queuing an update while we're in an update raises a high risk of
1600   // triggering endless events
1601   NS_ASSERTION(!mInUpdate, "Queuing an update while we're in an update");
1602   if (mUpdateQueued) {
1603     return;
1604   }
1605   mUpdateQueued = true;
1606   sThread->Dispatch(NS_NewRunnableFunction(
1607       "MediaCache::QueueUpdate", [self = RefPtr<MediaCache>(this)]() mutable {
1608         self->Update();
1609         // Ensure MediaCache is deleted on the main thread.
1610         NS_ReleaseOnMainThread("UpdateEvent::mMediaCache", self.forget());
1611       }));
1612 }
1613 
QueueSuspendedStatusUpdate(AutoLock &,int64_t aResourceID)1614 void MediaCache::QueueSuspendedStatusUpdate(AutoLock&, int64_t aResourceID) {
1615   if (!mSuspendedStatusToNotify.Contains(aResourceID)) {
1616     mSuspendedStatusToNotify.AppendElement(aResourceID);
1617   }
1618 }
1619 
1620 #ifdef DEBUG_VERIFY_CACHE
Verify(AutoLock &)1621 void MediaCache::Verify(AutoLock&) {
1622   mFreeBlocks.Verify();
1623   for (uint32_t i = 0; i < mStreams.Length(); ++i) {
1624     MediaCacheStream* stream = mStreams[i];
1625     stream->mReadaheadBlocks.Verify();
1626     stream->mPlayedBlocks.Verify();
1627     stream->mMetadataBlocks.Verify();
1628 
1629     // Verify that the readahead blocks are listed in stream block order
1630     int32_t block = stream->mReadaheadBlocks.GetFirstBlock();
1631     int32_t lastStreamBlock = -1;
1632     while (block >= 0) {
1633       uint32_t j = 0;
1634       while (mIndex[block].mOwners[j].mStream != stream) {
1635         ++j;
1636       }
1637       int32_t nextStreamBlock = int32_t(mIndex[block].mOwners[j].mStreamBlock);
1638       NS_ASSERTION(lastStreamBlock < nextStreamBlock,
1639                    "Blocks not increasing in readahead stream");
1640       lastStreamBlock = nextStreamBlock;
1641       block = stream->mReadaheadBlocks.GetNextBlock(block);
1642     }
1643   }
1644 }
1645 #endif
1646 
InsertReadaheadBlock(AutoLock & aLock,BlockOwner * aBlockOwner,int32_t aBlockIndex)1647 void MediaCache::InsertReadaheadBlock(AutoLock& aLock, BlockOwner* aBlockOwner,
1648                                       int32_t aBlockIndex) {
1649   // Find the last block whose stream block is before aBlockIndex's
1650   // stream block, and insert after it
1651   MediaCacheStream* stream = aBlockOwner->mStream;
1652   int32_t readaheadIndex = stream->mReadaheadBlocks.GetLastBlock();
1653   while (readaheadIndex >= 0) {
1654     BlockOwner* bo = GetBlockOwner(aLock, readaheadIndex, stream);
1655     NS_ASSERTION(bo, "stream must own its blocks");
1656     if (bo->mStreamBlock < aBlockOwner->mStreamBlock) {
1657       stream->mReadaheadBlocks.AddAfter(aBlockIndex, readaheadIndex);
1658       return;
1659     }
1660     NS_ASSERTION(bo->mStreamBlock > aBlockOwner->mStreamBlock,
1661                  "Duplicated blocks??");
1662     readaheadIndex = stream->mReadaheadBlocks.GetPrevBlock(readaheadIndex);
1663   }
1664 
1665   stream->mReadaheadBlocks.AddFirstBlock(aBlockIndex);
1666   Verify(aLock);
1667 }
1668 
AllocateAndWriteBlock(AutoLock & aLock,MediaCacheStream * aStream,int32_t aStreamBlockIndex,Span<const uint8_t> aData1,Span<const uint8_t> aData2)1669 void MediaCache::AllocateAndWriteBlock(AutoLock& aLock,
1670                                        MediaCacheStream* aStream,
1671                                        int32_t aStreamBlockIndex,
1672                                        Span<const uint8_t> aData1,
1673                                        Span<const uint8_t> aData2) {
1674   MOZ_ASSERT(sThread->IsOnCurrentThread());
1675 
1676   // Remove all cached copies of this block
1677   ResourceStreamIterator iter(this, aStream->mResourceID);
1678   while (MediaCacheStream* stream = iter.Next(aLock)) {
1679     while (aStreamBlockIndex >= int32_t(stream->mBlocks.Length())) {
1680       stream->mBlocks.AppendElement(-1);
1681     }
1682     if (stream->mBlocks[aStreamBlockIndex] >= 0) {
1683       // We no longer want to own this block
1684       int32_t globalBlockIndex = stream->mBlocks[aStreamBlockIndex];
1685       LOG("Released block %d from stream %p block %d(%" PRId64 ")",
1686           globalBlockIndex, stream, aStreamBlockIndex,
1687           aStreamBlockIndex * BLOCK_SIZE);
1688       RemoveBlockOwner(aLock, globalBlockIndex, stream);
1689     }
1690   }
1691 
1692   // Extend the mBlocks array as necessary
1693 
1694   TimeStamp now = TimeStamp::Now();
1695   int32_t blockIndex =
1696       FindBlockForIncomingData(aLock, now, aStream, aStreamBlockIndex);
1697   if (blockIndex >= 0) {
1698     FreeBlock(aLock, blockIndex);
1699 
1700     Block* block = &mIndex[blockIndex];
1701     LOG("Allocated block %d to stream %p block %d(%" PRId64 ")", blockIndex,
1702         aStream, aStreamBlockIndex, aStreamBlockIndex * BLOCK_SIZE);
1703 
1704     ResourceStreamIterator iter(this, aStream->mResourceID);
1705     while (MediaCacheStream* stream = iter.Next(aLock)) {
1706       BlockOwner* bo = block->mOwners.AppendElement();
1707       if (!bo) {
1708         // Roll back mOwners if any allocation fails.
1709         block->mOwners.Clear();
1710         return;
1711       }
1712       bo->mStream = stream;
1713     }
1714 
1715     if (block->mOwners.IsEmpty()) {
1716       // This happens when all streams with the resource id are closed. We can
1717       // just return here now and discard the data.
1718       return;
1719     }
1720 
1721     // Tell each stream using this resource about the new block.
1722     for (auto& bo : block->mOwners) {
1723       bo.mStreamBlock = aStreamBlockIndex;
1724       bo.mLastUseTime = now;
1725       bo.mStream->mBlocks[aStreamBlockIndex] = blockIndex;
1726       if (aStreamBlockIndex * BLOCK_SIZE < bo.mStream->mStreamOffset) {
1727         bo.mClass = PLAYED_BLOCK;
1728         // This must be the most-recently-used block, since we
1729         // marked it as used now (which may be slightly bogus, but we'll
1730         // treat it as used for simplicity).
1731         GetListForBlock(aLock, &bo)->AddFirstBlock(blockIndex);
1732         Verify(aLock);
1733       } else {
1734         // This may not be the latest readahead block, although it usually
1735         // will be. We may have to scan for the right place to insert
1736         // the block in the list.
1737         bo.mClass = READAHEAD_BLOCK;
1738         InsertReadaheadBlock(aLock, &bo, blockIndex);
1739       }
1740     }
1741 
1742     // Invariant: block->mOwners.IsEmpty() iff we can find an entry
1743     // in mFreeBlocks for a given blockIndex.
1744     MOZ_DIAGNOSTIC_ASSERT(!block->mOwners.IsEmpty());
1745     mFreeBlocks.RemoveBlock(blockIndex);
1746 
1747     nsresult rv = mBlockCache->WriteBlock(blockIndex, aData1, aData2);
1748     if (NS_FAILED(rv)) {
1749       LOG("Released block %d from stream %p block %d(%" PRId64 ")", blockIndex,
1750           aStream, aStreamBlockIndex, aStreamBlockIndex * BLOCK_SIZE);
1751       FreeBlock(aLock, blockIndex);
1752     }
1753   }
1754 
1755   // Queue an Update since the cache state has changed (for example
1756   // we might want to stop loading because the cache is full)
1757   QueueUpdate(aLock);
1758 }
1759 
OpenStream(AutoLock & aLock,MediaCacheStream * aStream,bool aIsClone)1760 void MediaCache::OpenStream(AutoLock& aLock, MediaCacheStream* aStream,
1761                             bool aIsClone) {
1762   LOG("Stream %p opened, aIsClone=%d, mCacheSuspended=%d, "
1763       "mDidNotifyDataEnded=%d",
1764       aStream, aIsClone, aStream->mCacheSuspended,
1765       aStream->mDidNotifyDataEnded);
1766   mStreams.AppendElement(aStream);
1767 
1768   // A cloned stream should've got the ID from its original.
1769   if (!aIsClone) {
1770     MOZ_ASSERT(aStream->mResourceID == 0, "mResourceID has been initialized.");
1771     aStream->mResourceID = AllocateResourceID(aLock);
1772   }
1773 
1774   // We should have a valid ID now no matter it is cloned or not.
1775   MOZ_ASSERT(aStream->mResourceID > 0, "mResourceID is invalid");
1776 
1777   // Queue an update since a new stream has been opened.
1778   QueueUpdate(aLock);
1779 }
1780 
ReleaseStream(AutoLock &,MediaCacheStream * aStream)1781 void MediaCache::ReleaseStream(AutoLock&, MediaCacheStream* aStream) {
1782   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
1783   LOG("Stream %p closed", aStream);
1784   mStreams.RemoveElement(aStream);
1785   // The caller needs to call QueueUpdate() to re-run Update().
1786 }
1787 
ReleaseStreamBlocks(AutoLock & aLock,MediaCacheStream * aStream)1788 void MediaCache::ReleaseStreamBlocks(AutoLock& aLock,
1789                                      MediaCacheStream* aStream) {
1790   // XXX scanning the entire stream doesn't seem great, if not much of it
1791   // is cached, but the only easy alternative is to scan the entire cache
1792   // which isn't better
1793   uint32_t length = aStream->mBlocks.Length();
1794   for (uint32_t i = 0; i < length; ++i) {
1795     int32_t blockIndex = aStream->mBlocks[i];
1796     if (blockIndex >= 0) {
1797       LOG("Released block %d from stream %p block %d(%" PRId64 ")", blockIndex,
1798           aStream, i, i * BLOCK_SIZE);
1799       RemoveBlockOwner(aLock, blockIndex, aStream);
1800     }
1801   }
1802 }
1803 
Truncate()1804 void MediaCache::Truncate() {
1805   uint32_t end;
1806   for (end = mIndex.Length(); end > 0; --end) {
1807     if (!IsBlockFree(end - 1)) break;
1808     mFreeBlocks.RemoveBlock(end - 1);
1809   }
1810 
1811   if (end < mIndex.Length()) {
1812     mIndex.TruncateLength(end);
1813     // XXX We could truncate the cache file here, but we don't seem
1814     // to have a cross-platform API for doing that. At least when all
1815     // streams are closed we shut down the cache, which erases the
1816     // file at that point.
1817   }
1818 }
1819 
NoteBlockUsage(AutoLock & aLock,MediaCacheStream * aStream,int32_t aBlockIndex,int64_t aStreamOffset,MediaCacheStream::ReadMode aMode,TimeStamp aNow)1820 void MediaCache::NoteBlockUsage(AutoLock& aLock, MediaCacheStream* aStream,
1821                                 int32_t aBlockIndex, int64_t aStreamOffset,
1822                                 MediaCacheStream::ReadMode aMode,
1823                                 TimeStamp aNow) {
1824   if (aBlockIndex < 0) {
1825     // this block is not in the cache yet
1826     return;
1827   }
1828 
1829   BlockOwner* bo = GetBlockOwner(aLock, aBlockIndex, aStream);
1830   if (!bo) {
1831     // this block is not in the cache yet
1832     return;
1833   }
1834 
1835   // The following check has to be <= because the stream offset has
1836   // not yet been updated for the data read from this block
1837   NS_ASSERTION(bo->mStreamBlock * BLOCK_SIZE <= aStreamOffset,
1838                "Using a block that's behind the read position?");
1839 
1840   GetListForBlock(aLock, bo)->RemoveBlock(aBlockIndex);
1841   bo->mClass =
1842       (aMode == MediaCacheStream::MODE_METADATA || bo->mClass == METADATA_BLOCK)
1843           ? METADATA_BLOCK
1844           : PLAYED_BLOCK;
1845   // Since this is just being used now, it can definitely be at the front
1846   // of mMetadataBlocks or mPlayedBlocks
1847   GetListForBlock(aLock, bo)->AddFirstBlock(aBlockIndex);
1848   bo->mLastUseTime = aNow;
1849   Verify(aLock);
1850 }
1851 
NoteSeek(AutoLock & aLock,MediaCacheStream * aStream,int64_t aOldOffset)1852 void MediaCache::NoteSeek(AutoLock& aLock, MediaCacheStream* aStream,
1853                           int64_t aOldOffset) {
1854   if (aOldOffset < aStream->mStreamOffset) {
1855     // We seeked forward. Convert blocks from readahead to played.
1856     // Any readahead block that intersects the seeked-over range must
1857     // be converted.
1858     int32_t blockIndex = OffsetToBlockIndex(aOldOffset);
1859     if (blockIndex < 0) {
1860       return;
1861     }
1862     int32_t endIndex =
1863         std::min(OffsetToBlockIndex(aStream->mStreamOffset + (BLOCK_SIZE - 1)),
1864                  int32_t(aStream->mBlocks.Length()));
1865     if (endIndex < 0) {
1866       return;
1867     }
1868     TimeStamp now = TimeStamp::Now();
1869     while (blockIndex < endIndex) {
1870       int32_t cacheBlockIndex = aStream->mBlocks[blockIndex];
1871       if (cacheBlockIndex >= 0) {
1872         // Marking the block used may not be exactly what we want but
1873         // it's simple
1874         NoteBlockUsage(aLock, aStream, cacheBlockIndex, aStream->mStreamOffset,
1875                        MediaCacheStream::MODE_PLAYBACK, now);
1876       }
1877       ++blockIndex;
1878     }
1879   } else {
1880     // We seeked backward. Convert from played to readahead.
1881     // Any played block that is entirely after the start of the seeked-over
1882     // range must be converted.
1883     int32_t blockIndex =
1884         OffsetToBlockIndex(aStream->mStreamOffset + (BLOCK_SIZE - 1));
1885     if (blockIndex < 0) {
1886       return;
1887     }
1888     int32_t endIndex =
1889         std::min(OffsetToBlockIndex(aOldOffset + (BLOCK_SIZE - 1)),
1890                  int32_t(aStream->mBlocks.Length()));
1891     if (endIndex < 0) {
1892       return;
1893     }
1894     while (blockIndex < endIndex) {
1895       MOZ_ASSERT(endIndex > 0);
1896       int32_t cacheBlockIndex = aStream->mBlocks[endIndex - 1];
1897       if (cacheBlockIndex >= 0) {
1898         BlockOwner* bo = GetBlockOwner(aLock, cacheBlockIndex, aStream);
1899         NS_ASSERTION(bo, "Stream doesn't own its blocks?");
1900         if (bo->mClass == PLAYED_BLOCK) {
1901           aStream->mPlayedBlocks.RemoveBlock(cacheBlockIndex);
1902           bo->mClass = READAHEAD_BLOCK;
1903           // Adding this as the first block is sure to be OK since
1904           // this must currently be the earliest readahead block
1905           // (that's why we're proceeding backwards from the end of
1906           // the seeked range to the start)
1907           aStream->mReadaheadBlocks.AddFirstBlock(cacheBlockIndex);
1908           Verify(aLock);
1909         }
1910       }
1911       --endIndex;
1912     }
1913   }
1914 }
1915 
NotifyLoadID(uint32_t aLoadID)1916 void MediaCacheStream::NotifyLoadID(uint32_t aLoadID) {
1917   MOZ_ASSERT(aLoadID > 0);
1918 
1919   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
1920       "MediaCacheStream::NotifyLoadID",
1921       [client = RefPtr<ChannelMediaResource>(mClient), this, aLoadID]() {
1922         AutoLock lock(mMediaCache->Monitor());
1923         mLoadID = aLoadID;
1924       });
1925   OwnerThread()->Dispatch(r.forget());
1926 }
1927 
NotifyDataStartedInternal(uint32_t aLoadID,int64_t aOffset,bool aSeekable,int64_t aLength)1928 void MediaCacheStream::NotifyDataStartedInternal(uint32_t aLoadID,
1929                                                  int64_t aOffset,
1930                                                  bool aSeekable,
1931                                                  int64_t aLength) {
1932   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
1933   MOZ_ASSERT(aLoadID > 0);
1934   LOG("Stream %p DataStarted: %" PRId64 " aLoadID=%u aLength=%" PRId64, this,
1935       aOffset, aLoadID, aLength);
1936 
1937   AutoLock lock(mMediaCache->Monitor());
1938   NS_WARNING_ASSERTION(aOffset == mSeekTarget || aOffset == mChannelOffset,
1939                        "Server is giving us unexpected offset");
1940   MOZ_ASSERT(aOffset >= 0);
1941   if (aLength >= 0) {
1942     mStreamLength = aLength;
1943   }
1944   mChannelOffset = aOffset;
1945   if (mStreamLength >= 0) {
1946     // If we started reading at a certain offset, then for sure
1947     // the stream is at least that long.
1948     mStreamLength = std::max(mStreamLength, mChannelOffset);
1949   }
1950   mLoadID = aLoadID;
1951 
1952   MOZ_ASSERT(aOffset == 0 || aSeekable,
1953              "channel offset must be zero when we become non-seekable");
1954   mIsTransportSeekable = aSeekable;
1955   // Queue an Update since we may change our strategy for dealing
1956   // with this stream
1957   mMediaCache->QueueUpdate(lock);
1958 
1959   // Reset mSeekTarget since the seek is completed so MediaCache::Update() will
1960   // make decisions based on mChannelOffset instead of mSeekTarget.
1961   mSeekTarget = -1;
1962 
1963   // Reset these flags since a new load has begun.
1964   mChannelEnded = false;
1965   mDidNotifyDataEnded = false;
1966 
1967   UpdateDownloadStatistics(lock);
1968 }
1969 
NotifyDataStarted(uint32_t aLoadID,int64_t aOffset,bool aSeekable,int64_t aLength)1970 void MediaCacheStream::NotifyDataStarted(uint32_t aLoadID, int64_t aOffset,
1971                                          bool aSeekable, int64_t aLength) {
1972   MOZ_ASSERT(NS_IsMainThread());
1973   MOZ_ASSERT(aLoadID > 0);
1974 
1975   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
1976       "MediaCacheStream::NotifyDataStarted",
1977       [=, client = RefPtr<ChannelMediaResource>(mClient)]() {
1978         NotifyDataStartedInternal(aLoadID, aOffset, aSeekable, aLength);
1979       });
1980   OwnerThread()->Dispatch(r.forget());
1981 }
1982 
NotifyDataReceived(uint32_t aLoadID,uint32_t aCount,const uint8_t * aData)1983 void MediaCacheStream::NotifyDataReceived(uint32_t aLoadID, uint32_t aCount,
1984                                           const uint8_t* aData) {
1985   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
1986   MOZ_ASSERT(aLoadID > 0);
1987 
1988   AutoLock lock(mMediaCache->Monitor());
1989   if (mClosed) {
1990     // Nothing to do if the stream is closed.
1991     return;
1992   }
1993 
1994   LOG("Stream %p DataReceived at %" PRId64 " count=%u aLoadID=%u", this,
1995       mChannelOffset, aCount, aLoadID);
1996 
1997   if (mLoadID != aLoadID) {
1998     // mChannelOffset is updated to a new position when loading a new channel.
1999     // We should discard the data coming from the old channel so it won't be
2000     // stored to the wrong positoin.
2001     return;
2002   }
2003 
2004   mDownloadStatistics.AddBytes(aCount);
2005 
2006   // True if we commit any blocks to the cache.
2007   bool cacheUpdated = false;
2008 
2009   auto source = Span<const uint8_t>(aData, aCount);
2010 
2011   // We process the data one block (or part of a block) at a time
2012   while (!source.IsEmpty()) {
2013     // The data we've collected so far in the partial block.
2014     auto partial = Span<const uint8_t>(mPartialBlockBuffer.get(),
2015                                        OffsetInBlock(mChannelOffset));
2016 
2017     // The number of bytes needed to complete the partial block.
2018     size_t remaining = BLOCK_SIZE - partial.Length();
2019 
2020     if (source.Length() >= remaining) {
2021       // We have a whole block now to write it out.
2022       mMediaCache->AllocateAndWriteBlock(
2023           lock, this, OffsetToBlockIndexUnchecked(mChannelOffset), partial,
2024           source.First(remaining));
2025       source = source.From(remaining);
2026       mChannelOffset += remaining;
2027       cacheUpdated = true;
2028     } else {
2029       // The buffer to be filled in the partial block.
2030       auto buf = Span<uint8_t>(mPartialBlockBuffer.get() + partial.Length(),
2031                                remaining);
2032       memcpy(buf.Elements(), source.Elements(), source.Length());
2033       mChannelOffset += source.Length();
2034       break;
2035     }
2036   }
2037 
2038   MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
2039   while (MediaCacheStream* stream = iter.Next(lock)) {
2040     if (stream->mStreamLength >= 0) {
2041       // The stream is at least as long as what we've read
2042       stream->mStreamLength = std::max(stream->mStreamLength, mChannelOffset);
2043     }
2044     stream->mClient->CacheClientNotifyDataReceived();
2045   }
2046 
2047   // XXX it would be fairly easy to optimize things a lot more to
2048   // avoid waking up reader threads unnecessarily
2049   if (cacheUpdated) {
2050     // Wake up the reader who is waiting for the committed blocks.
2051     lock.NotifyAll();
2052   }
2053 }
2054 
FlushPartialBlockInternal(AutoLock & aLock,bool aNotifyAll)2055 void MediaCacheStream::FlushPartialBlockInternal(AutoLock& aLock,
2056                                                  bool aNotifyAll) {
2057   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
2058 
2059   int32_t blockIndex = OffsetToBlockIndexUnchecked(mChannelOffset);
2060   int32_t blockOffset = OffsetInBlock(mChannelOffset);
2061   if (blockOffset > 0) {
2062     LOG("Stream %p writing partial block: [%d] bytes; "
2063         "mStreamOffset [%" PRId64 "] mChannelOffset[%" PRId64
2064         "] mStreamLength [%" PRId64 "] notifying: [%s]",
2065         this, blockOffset, mStreamOffset, mChannelOffset, mStreamLength,
2066         aNotifyAll ? "yes" : "no");
2067 
2068     // Write back the partial block
2069     memset(mPartialBlockBuffer.get() + blockOffset, 0,
2070            BLOCK_SIZE - blockOffset);
2071     auto data = Span<const uint8_t>(mPartialBlockBuffer.get(), BLOCK_SIZE);
2072     mMediaCache->AllocateAndWriteBlock(aLock, this, blockIndex, data);
2073   }
2074 
2075   // |mChannelOffset == 0| means download ends with no bytes received.
2076   // We should also wake up those readers who are waiting for data
2077   // that will never come.
2078   if ((blockOffset > 0 || mChannelOffset == 0) && aNotifyAll) {
2079     // Wake up readers who may be waiting for this data
2080     aLock.NotifyAll();
2081   }
2082 }
2083 
UpdateDownloadStatistics(AutoLock &)2084 void MediaCacheStream::UpdateDownloadStatistics(AutoLock&) {
2085   if (mChannelEnded || mClientSuspended) {
2086     mDownloadStatistics.Stop();
2087   } else {
2088     mDownloadStatistics.Start();
2089   }
2090 }
2091 
NotifyDataEndedInternal(uint32_t aLoadID,nsresult aStatus)2092 void MediaCacheStream::NotifyDataEndedInternal(uint32_t aLoadID,
2093                                                nsresult aStatus) {
2094   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
2095   AutoLock lock(mMediaCache->Monitor());
2096 
2097   if (mClosed || aLoadID != mLoadID) {
2098     // Nothing to do if the stream is closed or a new load has begun.
2099     return;
2100   }
2101 
2102   // It is prudent to update channel/cache status before calling
2103   // CacheClientNotifyDataEnded() which will read |mChannelEnded|.
2104   mChannelEnded = true;
2105   mMediaCache->QueueUpdate(lock);
2106 
2107   UpdateDownloadStatistics(lock);
2108 
2109   if (NS_FAILED(aStatus)) {
2110     // Notify the client about this network error.
2111     mDidNotifyDataEnded = true;
2112     mNotifyDataEndedStatus = aStatus;
2113     mClient->CacheClientNotifyDataEnded(aStatus);
2114     // Wake up the readers so they can fail gracefully.
2115     lock.NotifyAll();
2116     return;
2117   }
2118 
2119   // Note we don't flush the partial block when download ends abnormally for
2120   // the padding zeros will give wrong data to other streams.
2121   FlushPartialBlockInternal(lock, true);
2122 
2123   MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
2124   while (MediaCacheStream* stream = iter.Next(lock)) {
2125     // We read the whole stream, so remember the true length
2126     stream->mStreamLength = mChannelOffset;
2127     if (!stream->mDidNotifyDataEnded) {
2128       stream->mDidNotifyDataEnded = true;
2129       stream->mNotifyDataEndedStatus = aStatus;
2130       stream->mClient->CacheClientNotifyDataEnded(aStatus);
2131     }
2132   }
2133 }
2134 
NotifyDataEnded(uint32_t aLoadID,nsresult aStatus)2135 void MediaCacheStream::NotifyDataEnded(uint32_t aLoadID, nsresult aStatus) {
2136   MOZ_ASSERT(NS_IsMainThread());
2137   MOZ_ASSERT(aLoadID > 0);
2138 
2139   RefPtr<ChannelMediaResource> client = mClient;
2140   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
2141       "MediaCacheStream::NotifyDataEnded", [client, this, aLoadID, aStatus]() {
2142         NotifyDataEndedInternal(aLoadID, aStatus);
2143       });
2144   OwnerThread()->Dispatch(r.forget());
2145 }
2146 
NotifyClientSuspended(bool aSuspended)2147 void MediaCacheStream::NotifyClientSuspended(bool aSuspended) {
2148   MOZ_ASSERT(NS_IsMainThread());
2149 
2150   RefPtr<ChannelMediaResource> client = mClient;
2151   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
2152       "MediaCacheStream::NotifyClientSuspended", [client, this, aSuspended]() {
2153         AutoLock lock(mMediaCache->Monitor());
2154         if (!mClosed && mClientSuspended != aSuspended) {
2155           mClientSuspended = aSuspended;
2156           // mClientSuspended changes the decision of reading streams.
2157           mMediaCache->QueueUpdate(lock);
2158           UpdateDownloadStatistics(lock);
2159           if (mClientSuspended) {
2160             // Download is suspended. Wake up the readers that might be able to
2161             // get data from the partial block.
2162             lock.NotifyAll();
2163           }
2164         }
2165       });
2166   OwnerThread()->Dispatch(r.forget());
2167 }
2168 
NotifyResume()2169 void MediaCacheStream::NotifyResume() {
2170   MOZ_ASSERT(NS_IsMainThread());
2171 
2172   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
2173       "MediaCacheStream::NotifyResume",
2174       [this, client = RefPtr<ChannelMediaResource>(mClient)]() {
2175         AutoLock lock(mMediaCache->Monitor());
2176         if (mClosed) {
2177           return;
2178         }
2179         // Don't resume download if we are already at the end of the stream for
2180         // seek will fail and be wasted anyway.
2181         int64_t offset = mSeekTarget != -1 ? mSeekTarget : mChannelOffset;
2182         if (mStreamLength < 0 || offset < mStreamLength) {
2183           mClient->CacheClientSeek(offset, false);
2184           // DownloadResumed() will be notified when a new channel is opened.
2185         }
2186         // The channel remains dead. If we want to read some other data in the
2187         // future, CacheClientSeek() will be called to reopen the channel.
2188       });
2189   OwnerThread()->Dispatch(r.forget());
2190 }
2191 
~MediaCacheStream()2192 MediaCacheStream::~MediaCacheStream() {
2193   MOZ_ASSERT(NS_IsMainThread(), "Only call on main thread");
2194   MOZ_ASSERT(!mPinCount, "Unbalanced Pin");
2195   MOZ_ASSERT(!mMediaCache || mClosed);
2196 
2197   uint32_t lengthKb = uint32_t(std::min(
2198       std::max(mStreamLength, int64_t(0)) / 1024, int64_t(UINT32_MAX)));
2199   LOG("MediaCacheStream::~MediaCacheStream(this=%p) "
2200       "MEDIACACHESTREAM_LENGTH_KB=%" PRIu32,
2201       this, lengthKb);
2202 }
2203 
AreAllStreamsForResourceSuspended(AutoLock & aLock)2204 bool MediaCacheStream::AreAllStreamsForResourceSuspended(AutoLock& aLock) {
2205   MOZ_ASSERT(!NS_IsMainThread());
2206 
2207   MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
2208   // Look for a stream that's able to read the data we need
2209   int64_t dataOffset = -1;
2210   while (MediaCacheStream* stream = iter.Next(aLock)) {
2211     if (stream->mCacheSuspended || stream->mChannelEnded || stream->mClosed) {
2212       continue;
2213     }
2214     if (dataOffset < 0) {
2215       dataOffset = GetCachedDataEndInternal(aLock, mStreamOffset);
2216     }
2217     // Ignore streams that are reading beyond the data we need
2218     if (stream->mChannelOffset > dataOffset) {
2219       continue;
2220     }
2221     return false;
2222   }
2223 
2224   return true;
2225 }
2226 
Close()2227 RefPtr<GenericPromise> MediaCacheStream::Close() {
2228   MOZ_ASSERT(NS_IsMainThread());
2229   if (!mMediaCache) {
2230     return GenericPromise::CreateAndResolve(true, __func__);
2231   }
2232 
2233   return InvokeAsync(OwnerThread(), "MediaCacheStream::Close",
2234                      [this, client = RefPtr<ChannelMediaResource>(mClient)] {
2235                        AutoLock lock(mMediaCache->Monitor());
2236                        CloseInternal(lock);
2237                        return GenericPromise::CreateAndResolve(true, __func__);
2238                      });
2239 }
2240 
CloseInternal(AutoLock & aLock)2241 void MediaCacheStream::CloseInternal(AutoLock& aLock) {
2242   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
2243 
2244   if (mClosed) {
2245     return;
2246   }
2247 
2248   // Closing a stream will change the return value of
2249   // MediaCacheStream::AreAllStreamsForResourceSuspended as well as
2250   // ChannelMediaResource::IsSuspendedByCache. Let's notify it.
2251   mMediaCache->QueueSuspendedStatusUpdate(aLock, mResourceID);
2252 
2253   mClosed = true;
2254   mMediaCache->ReleaseStreamBlocks(aLock, this);
2255   mMediaCache->ReleaseStream(aLock, this);
2256   // Wake up any blocked readers
2257   aLock.NotifyAll();
2258 
2259   // Queue an Update since we may have created more free space.
2260   mMediaCache->QueueUpdate(aLock);
2261 }
2262 
Pin()2263 void MediaCacheStream::Pin() {
2264   MOZ_ASSERT(!NS_IsMainThread());
2265   AutoLock lock(mMediaCache->Monitor());
2266   ++mPinCount;
2267   // Queue an Update since we may no longer want to read more into the
2268   // cache, if this stream's block have become non-evictable
2269   mMediaCache->QueueUpdate(lock);
2270 }
2271 
Unpin()2272 void MediaCacheStream::Unpin() {
2273   MOZ_ASSERT(!NS_IsMainThread());
2274   AutoLock lock(mMediaCache->Monitor());
2275   NS_ASSERTION(mPinCount > 0, "Unbalanced Unpin");
2276   --mPinCount;
2277   // Queue an Update since we may be able to read more into the
2278   // cache, if this stream's block have become evictable
2279   mMediaCache->QueueUpdate(lock);
2280 }
2281 
GetLength() const2282 int64_t MediaCacheStream::GetLength() const {
2283   MOZ_ASSERT(!NS_IsMainThread());
2284   AutoLock lock(mMediaCache->Monitor());
2285   return mStreamLength;
2286 }
2287 
GetLengthAndOffset() const2288 MediaCacheStream::LengthAndOffset MediaCacheStream::GetLengthAndOffset() const {
2289   MOZ_ASSERT(NS_IsMainThread());
2290   AutoLock lock(mMediaCache->Monitor());
2291   return {mStreamLength, mChannelOffset};
2292 }
2293 
GetNextCachedData(int64_t aOffset)2294 int64_t MediaCacheStream::GetNextCachedData(int64_t aOffset) {
2295   MOZ_ASSERT(!NS_IsMainThread());
2296   AutoLock lock(mMediaCache->Monitor());
2297   return GetNextCachedDataInternal(lock, aOffset);
2298 }
2299 
GetCachedDataEnd(int64_t aOffset)2300 int64_t MediaCacheStream::GetCachedDataEnd(int64_t aOffset) {
2301   MOZ_ASSERT(!NS_IsMainThread());
2302   AutoLock lock(mMediaCache->Monitor());
2303   return GetCachedDataEndInternal(lock, aOffset);
2304 }
2305 
IsDataCachedToEndOfStream(int64_t aOffset)2306 bool MediaCacheStream::IsDataCachedToEndOfStream(int64_t aOffset) {
2307   MOZ_ASSERT(!NS_IsMainThread());
2308   AutoLock lock(mMediaCache->Monitor());
2309   if (mStreamLength < 0) return false;
2310   return GetCachedDataEndInternal(lock, aOffset) >= mStreamLength;
2311 }
2312 
GetCachedDataEndInternal(AutoLock &,int64_t aOffset)2313 int64_t MediaCacheStream::GetCachedDataEndInternal(AutoLock&, int64_t aOffset) {
2314   int32_t blockIndex = OffsetToBlockIndex(aOffset);
2315   if (blockIndex < 0) {
2316     return aOffset;
2317   }
2318   while (size_t(blockIndex) < mBlocks.Length() && mBlocks[blockIndex] != -1) {
2319     ++blockIndex;
2320   }
2321   int64_t result = blockIndex * BLOCK_SIZE;
2322   if (blockIndex == OffsetToBlockIndexUnchecked(mChannelOffset)) {
2323     // The block containing mChannelOffset may be partially read but not
2324     // yet committed to the main cache
2325     result = mChannelOffset;
2326   }
2327   if (mStreamLength >= 0) {
2328     // The last block in the cache may only be partially valid, so limit
2329     // the cached range to the stream length
2330     result = std::min(result, mStreamLength);
2331   }
2332   return std::max(result, aOffset);
2333 }
2334 
GetNextCachedDataInternal(AutoLock &,int64_t aOffset)2335 int64_t MediaCacheStream::GetNextCachedDataInternal(AutoLock&,
2336                                                     int64_t aOffset) {
2337   if (aOffset == mStreamLength) return -1;
2338 
2339   int32_t startBlockIndex = OffsetToBlockIndex(aOffset);
2340   if (startBlockIndex < 0) {
2341     return -1;
2342   }
2343   int32_t channelBlockIndex = OffsetToBlockIndexUnchecked(mChannelOffset);
2344 
2345   if (startBlockIndex == channelBlockIndex && aOffset < mChannelOffset) {
2346     // The block containing mChannelOffset is partially read, but not
2347     // yet committed to the main cache. aOffset lies in the partially
2348     // read portion, thus it is effectively cached.
2349     return aOffset;
2350   }
2351 
2352   if (size_t(startBlockIndex) >= mBlocks.Length()) return -1;
2353 
2354   // Is the current block cached?
2355   if (mBlocks[startBlockIndex] != -1) return aOffset;
2356 
2357   // Count the number of uncached blocks
2358   bool hasPartialBlock = OffsetInBlock(mChannelOffset) != 0;
2359   int32_t blockIndex = startBlockIndex + 1;
2360   while (true) {
2361     if ((hasPartialBlock && blockIndex == channelBlockIndex) ||
2362         (size_t(blockIndex) < mBlocks.Length() && mBlocks[blockIndex] != -1)) {
2363       // We at the incoming channel block, which has has data in it,
2364       // or are we at a cached block. Return index of block start.
2365       return blockIndex * BLOCK_SIZE;
2366     }
2367 
2368     // No more cached blocks?
2369     if (size_t(blockIndex) >= mBlocks.Length()) return -1;
2370 
2371     ++blockIndex;
2372   }
2373 
2374   MOZ_ASSERT_UNREACHABLE("Should return in loop");
2375   return -1;
2376 }
2377 
SetReadMode(ReadMode aMode)2378 void MediaCacheStream::SetReadMode(ReadMode aMode) {
2379   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
2380       "MediaCacheStream::SetReadMode",
2381       [this, client = RefPtr<ChannelMediaResource>(mClient), aMode]() {
2382         AutoLock lock(mMediaCache->Monitor());
2383         if (!mClosed && mCurrentMode != aMode) {
2384           mCurrentMode = aMode;
2385           mMediaCache->QueueUpdate(lock);
2386         }
2387       });
2388   OwnerThread()->Dispatch(r.forget());
2389 }
2390 
SetPlaybackRate(uint32_t aBytesPerSecond)2391 void MediaCacheStream::SetPlaybackRate(uint32_t aBytesPerSecond) {
2392   MOZ_ASSERT(!NS_IsMainThread());
2393   MOZ_ASSERT(aBytesPerSecond > 0, "Zero playback rate not allowed");
2394 
2395   AutoLock lock(mMediaCache->Monitor());
2396   if (!mClosed && mPlaybackBytesPerSecond != aBytesPerSecond) {
2397     mPlaybackBytesPerSecond = aBytesPerSecond;
2398     mMediaCache->QueueUpdate(lock);
2399   }
2400 }
2401 
Seek(AutoLock & aLock,int64_t aOffset)2402 nsresult MediaCacheStream::Seek(AutoLock& aLock, int64_t aOffset) {
2403   MOZ_ASSERT(!NS_IsMainThread());
2404 
2405   if (!IsOffsetAllowed(aOffset)) {
2406     return NS_ERROR_ILLEGAL_VALUE;
2407   }
2408   if (mClosed) {
2409     return NS_ERROR_ABORT;
2410   }
2411 
2412   int64_t oldOffset = mStreamOffset;
2413   mStreamOffset = aOffset;
2414   LOG("Stream %p Seek to %" PRId64, this, mStreamOffset);
2415   mMediaCache->NoteSeek(aLock, this, oldOffset);
2416   mMediaCache->QueueUpdate(aLock);
2417   return NS_OK;
2418 }
2419 
ThrottleReadahead(bool bThrottle)2420 void MediaCacheStream::ThrottleReadahead(bool bThrottle) {
2421   MOZ_ASSERT(NS_IsMainThread());
2422 
2423   nsCOMPtr<nsIRunnable> r = NS_NewRunnableFunction(
2424       "MediaCacheStream::ThrottleReadahead",
2425       [client = RefPtr<ChannelMediaResource>(mClient), this, bThrottle]() {
2426         AutoLock lock(mMediaCache->Monitor());
2427         if (!mClosed && mThrottleReadahead != bThrottle) {
2428           LOGI("Stream %p ThrottleReadahead %d", this, bThrottle);
2429           mThrottleReadahead = bThrottle;
2430           mMediaCache->QueueUpdate(lock);
2431         }
2432       });
2433   OwnerThread()->Dispatch(r.forget());
2434 }
2435 
ReadPartialBlock(AutoLock &,int64_t aOffset,Span<char> aBuffer)2436 uint32_t MediaCacheStream::ReadPartialBlock(AutoLock&, int64_t aOffset,
2437                                             Span<char> aBuffer) {
2438   MOZ_ASSERT(IsOffsetAllowed(aOffset));
2439 
2440   if (OffsetToBlockIndexUnchecked(mChannelOffset) !=
2441           OffsetToBlockIndexUnchecked(aOffset) ||
2442       aOffset >= mChannelOffset) {
2443     // Not in the partial block or no data to read.
2444     return 0;
2445   }
2446 
2447   auto source = Span<const uint8_t>(
2448       mPartialBlockBuffer.get() + OffsetInBlock(aOffset),
2449       OffsetInBlock(mChannelOffset) - OffsetInBlock(aOffset));
2450   // We have |source.Length() <= BLOCK_SIZE < INT32_MAX| to guarantee
2451   // that |bytesToRead| can fit into a uint32_t.
2452   uint32_t bytesToRead = std::min(aBuffer.Length(), source.Length());
2453   memcpy(aBuffer.Elements(), source.Elements(), bytesToRead);
2454   return bytesToRead;
2455 }
2456 
ReadBlockFromCache(AutoLock & aLock,int64_t aOffset,Span<char> aBuffer,bool aNoteBlockUsage)2457 Result<uint32_t, nsresult> MediaCacheStream::ReadBlockFromCache(
2458     AutoLock& aLock, int64_t aOffset, Span<char> aBuffer,
2459     bool aNoteBlockUsage) {
2460   MOZ_ASSERT(IsOffsetAllowed(aOffset));
2461 
2462   // OffsetToBlockIndexUnchecked() is always non-negative.
2463   uint32_t index = OffsetToBlockIndexUnchecked(aOffset);
2464   int32_t cacheBlock = index < mBlocks.Length() ? mBlocks[index] : -1;
2465   if (cacheBlock < 0 || (mStreamLength >= 0 && aOffset >= mStreamLength)) {
2466     // Not in the cache.
2467     return 0;
2468   }
2469 
2470   if (aBuffer.Length() > size_t(BLOCK_SIZE)) {
2471     // Clamp the buffer to avoid overflow below since we will read at most
2472     // BLOCK_SIZE bytes.
2473     aBuffer = aBuffer.First(BLOCK_SIZE);
2474   }
2475 
2476   if (mStreamLength >= 0 &&
2477       int64_t(aBuffer.Length()) > mStreamLength - aOffset) {
2478     // Clamp reads to stream's length
2479     aBuffer = aBuffer.First(mStreamLength - aOffset);
2480   }
2481 
2482   // |BLOCK_SIZE - OffsetInBlock(aOffset)| <= BLOCK_SIZE
2483   int32_t bytesToRead =
2484       std::min<int32_t>(BLOCK_SIZE - OffsetInBlock(aOffset), aBuffer.Length());
2485   int32_t bytesRead = 0;
2486   nsresult rv = mMediaCache->ReadCacheFile(
2487       aLock, cacheBlock * BLOCK_SIZE + OffsetInBlock(aOffset),
2488       aBuffer.Elements(), bytesToRead, &bytesRead);
2489 
2490   // Ensure |cacheBlock * BLOCK_SIZE + OffsetInBlock(aOffset)| won't overflow.
2491   static_assert(INT64_MAX >= BLOCK_SIZE * (uint32_t(INT32_MAX) + 1),
2492                 "BLOCK_SIZE too large!");
2493 
2494   if (NS_FAILED(rv)) {
2495     nsCString name;
2496     GetErrorName(rv, name);
2497     LOGE("Stream %p ReadCacheFile failed, rv=%s", this, name.Data());
2498     return mozilla::Err(rv);
2499   }
2500 
2501   if (aNoteBlockUsage) {
2502     mMediaCache->NoteBlockUsage(aLock, this, cacheBlock, aOffset, mCurrentMode,
2503                                 TimeStamp::Now());
2504   }
2505 
2506   return bytesRead;
2507 }
2508 
Read(AutoLock & aLock,char * aBuffer,uint32_t aCount,uint32_t * aBytes)2509 nsresult MediaCacheStream::Read(AutoLock& aLock, char* aBuffer, uint32_t aCount,
2510                                 uint32_t* aBytes) {
2511   MOZ_ASSERT(!NS_IsMainThread());
2512 
2513   // Cache the offset in case it is changed again when we are waiting for the
2514   // monitor to be notified to avoid reading at the wrong position.
2515   auto streamOffset = mStreamOffset;
2516 
2517   // The buffer we are about to fill.
2518   auto buffer = Span<char>(aBuffer, aCount);
2519 
2520   // Read one block (or part of a block) at a time
2521   while (!buffer.IsEmpty()) {
2522     if (mClosed) {
2523       return NS_ERROR_ABORT;
2524     }
2525 
2526     if (!IsOffsetAllowed(streamOffset)) {
2527       LOGE("Stream %p invalid offset=%" PRId64, this, streamOffset);
2528       return NS_ERROR_ILLEGAL_VALUE;
2529     }
2530 
2531     if (mStreamLength >= 0 && streamOffset >= mStreamLength) {
2532       // Don't try to read beyond the end of the stream
2533       break;
2534     }
2535 
2536     Result<uint32_t, nsresult> rv = ReadBlockFromCache(
2537         aLock, streamOffset, buffer, true /* aNoteBlockUsage */);
2538     if (rv.isErr()) {
2539       return rv.unwrapErr();
2540     }
2541 
2542     uint32_t bytes = rv.unwrap();
2543     if (bytes > 0) {
2544       // Got data from the cache successfully. Read next block.
2545       streamOffset += bytes;
2546       buffer = buffer.From(bytes);
2547       continue;
2548     }
2549 
2550     // See if we can use the data in the partial block of any stream reading
2551     // this resource. Note we use the partial block only when it is completed,
2552     // that is reaching EOS.
2553     bool foundDataInPartialBlock = false;
2554     MediaCache::ResourceStreamIterator iter(mMediaCache, mResourceID);
2555     while (MediaCacheStream* stream = iter.Next(aLock)) {
2556       if (OffsetToBlockIndexUnchecked(stream->mChannelOffset) ==
2557               OffsetToBlockIndexUnchecked(streamOffset) &&
2558           stream->mChannelOffset == stream->mStreamLength) {
2559         uint32_t bytes = stream->ReadPartialBlock(aLock, streamOffset, buffer);
2560         streamOffset += bytes;
2561         buffer = buffer.From(bytes);
2562         foundDataInPartialBlock = true;
2563         break;
2564       }
2565     }
2566     if (foundDataInPartialBlock) {
2567       // Break for we've reached EOS.
2568       break;
2569     }
2570 
2571     if (mDidNotifyDataEnded && NS_FAILED(mNotifyDataEndedStatus)) {
2572       // Since download ends abnormally, there is no point in waiting for new
2573       // data to come. We will check the partial block to read as many bytes as
2574       // possible before exiting this function.
2575       bytes = ReadPartialBlock(aLock, streamOffset, buffer);
2576       streamOffset += bytes;
2577       buffer = buffer.From(bytes);
2578       break;
2579     }
2580 
2581     if (mStreamOffset != streamOffset) {
2582       // Update mStreamOffset before we drop the lock. We need to run
2583       // Update() again since stream reading strategy might have changed.
2584       mStreamOffset = streamOffset;
2585       mMediaCache->QueueUpdate(aLock);
2586     }
2587 
2588     // No data to read, so block
2589     aLock.Wait();
2590   }
2591 
2592   uint32_t count = buffer.Elements() - aBuffer;
2593   *aBytes = count;
2594   if (count == 0) {
2595     return NS_OK;
2596   }
2597 
2598   // Some data was read, so queue an update since block priorities may
2599   // have changed
2600   mMediaCache->QueueUpdate(aLock);
2601 
2602   LOG("Stream %p Read at %" PRId64 " count=%d", this, streamOffset - count,
2603       count);
2604   mStreamOffset = streamOffset;
2605   return NS_OK;
2606 }
2607 
ReadAt(int64_t aOffset,char * aBuffer,uint32_t aCount,uint32_t * aBytes)2608 nsresult MediaCacheStream::ReadAt(int64_t aOffset, char* aBuffer,
2609                                   uint32_t aCount, uint32_t* aBytes) {
2610   MOZ_ASSERT(!NS_IsMainThread());
2611   AutoLock lock(mMediaCache->Monitor());
2612   nsresult rv = Seek(lock, aOffset);
2613   if (NS_FAILED(rv)) return rv;
2614   return Read(lock, aBuffer, aCount, aBytes);
2615 }
2616 
ReadFromCache(char * aBuffer,int64_t aOffset,uint32_t aCount)2617 nsresult MediaCacheStream::ReadFromCache(char* aBuffer, int64_t aOffset,
2618                                          uint32_t aCount) {
2619   MOZ_ASSERT(!NS_IsMainThread());
2620   AutoLock lock(mMediaCache->Monitor());
2621 
2622   // The buffer we are about to fill.
2623   auto buffer = Span<char>(aBuffer, aCount);
2624 
2625   // Read one block (or part of a block) at a time
2626   int64_t streamOffset = aOffset;
2627   while (!buffer.IsEmpty()) {
2628     if (mClosed) {
2629       // We need to check |mClosed| in each iteration which might be changed
2630       // after calling |mMediaCache->ReadCacheFile|.
2631       return NS_ERROR_FAILURE;
2632     }
2633 
2634     if (!IsOffsetAllowed(streamOffset)) {
2635       LOGE("Stream %p invalid offset=%" PRId64, this, streamOffset);
2636       return NS_ERROR_ILLEGAL_VALUE;
2637     }
2638 
2639     Result<uint32_t, nsresult> rv =
2640         ReadBlockFromCache(lock, streamOffset, buffer);
2641     if (rv.isErr()) {
2642       return rv.unwrapErr();
2643     }
2644 
2645     uint32_t bytes = rv.unwrap();
2646     if (bytes > 0) {
2647       // Read data from the cache successfully. Let's try next block.
2648       streamOffset += bytes;
2649       buffer = buffer.From(bytes);
2650       continue;
2651     }
2652 
2653     // The partial block is our last chance to get data.
2654     bytes = ReadPartialBlock(lock, streamOffset, buffer);
2655     if (bytes < buffer.Length()) {
2656       // Not enough data to read.
2657       return NS_ERROR_FAILURE;
2658     }
2659 
2660     // Return for we've got all the requested bytes.
2661     return NS_OK;
2662   }
2663 
2664   return NS_OK;
2665 }
2666 
Init(int64_t aContentLength)2667 nsresult MediaCacheStream::Init(int64_t aContentLength) {
2668   NS_ASSERTION(NS_IsMainThread(), "Only call on main thread");
2669   MOZ_ASSERT(!mMediaCache, "Has been initialized.");
2670 
2671   if (aContentLength > 0) {
2672     uint32_t length = uint32_t(std::min(aContentLength, int64_t(UINT32_MAX)));
2673     LOG("MediaCacheStream::Init(this=%p) "
2674         "MEDIACACHESTREAM_NOTIFIED_LENGTH=%" PRIu32,
2675         this, length);
2676 
2677     mStreamLength = aContentLength;
2678   }
2679 
2680   mMediaCache = MediaCache::GetMediaCache(aContentLength, mIsPrivateBrowsing);
2681   if (!mMediaCache) {
2682     return NS_ERROR_FAILURE;
2683   }
2684 
2685   OwnerThread()->Dispatch(NS_NewRunnableFunction(
2686       "MediaCacheStream::Init",
2687       [this, res = RefPtr<ChannelMediaResource>(mClient)]() {
2688         AutoLock lock(mMediaCache->Monitor());
2689         mMediaCache->OpenStream(lock, this);
2690       }));
2691 
2692   return NS_OK;
2693 }
2694 
InitAsClone(MediaCacheStream * aOriginal)2695 void MediaCacheStream::InitAsClone(MediaCacheStream* aOriginal) {
2696   MOZ_ASSERT(!mMediaCache, "Has been initialized.");
2697   MOZ_ASSERT(aOriginal->mMediaCache, "Don't clone an uninitialized stream.");
2698 
2699   // Use the same MediaCache as our clone.
2700   mMediaCache = aOriginal->mMediaCache;
2701   OwnerThread()->Dispatch(NS_NewRunnableFunction(
2702       "MediaCacheStream::InitAsClone",
2703       [this, aOriginal, r1 = RefPtr<ChannelMediaResource>(mClient),
2704        r2 = RefPtr<ChannelMediaResource>(aOriginal->mClient)]() {
2705         InitAsCloneInternal(aOriginal);
2706       }));
2707 }
2708 
InitAsCloneInternal(MediaCacheStream * aOriginal)2709 void MediaCacheStream::InitAsCloneInternal(MediaCacheStream* aOriginal) {
2710   MOZ_ASSERT(OwnerThread()->IsOnCurrentThread());
2711   AutoLock lock(mMediaCache->Monitor());
2712   LOG("MediaCacheStream::InitAsCloneInternal(this=%p, original=%p)", this,
2713       aOriginal);
2714 
2715   // Download data and notify events if necessary. Note the order is important
2716   // in order to mimic the behavior of data being downloaded from the channel.
2717 
2718   // Step 1: copy/download data from the original stream.
2719   mResourceID = aOriginal->mResourceID;
2720   mStreamLength = aOriginal->mStreamLength;
2721   mIsTransportSeekable = aOriginal->mIsTransportSeekable;
2722   mDownloadStatistics = aOriginal->mDownloadStatistics;
2723   mDownloadStatistics.Stop();
2724 
2725   // Grab cache blocks from aOriginal as readahead blocks for our stream
2726   for (uint32_t i = 0; i < aOriginal->mBlocks.Length(); ++i) {
2727     int32_t cacheBlockIndex = aOriginal->mBlocks[i];
2728     if (cacheBlockIndex < 0) continue;
2729 
2730     while (i >= mBlocks.Length()) {
2731       mBlocks.AppendElement(-1);
2732     }
2733     // Every block is a readahead block for the clone because the clone's
2734     // initial stream offset is zero
2735     mMediaCache->AddBlockOwnerAsReadahead(lock, cacheBlockIndex, this, i);
2736   }
2737 
2738   // Copy the partial block.
2739   mChannelOffset = aOriginal->mChannelOffset;
2740   memcpy(mPartialBlockBuffer.get(), aOriginal->mPartialBlockBuffer.get(),
2741          BLOCK_SIZE);
2742 
2743   // Step 2: notify the client that we have new data so the decoder has a chance
2744   // to compute 'canplaythrough' and buffer ranges.
2745   mClient->CacheClientNotifyDataReceived();
2746 
2747   // Step 3: notify download ended if necessary.
2748   if (aOriginal->mDidNotifyDataEnded &&
2749       NS_SUCCEEDED(aOriginal->mNotifyDataEndedStatus)) {
2750     mNotifyDataEndedStatus = aOriginal->mNotifyDataEndedStatus;
2751     mDidNotifyDataEnded = true;
2752     mClient->CacheClientNotifyDataEnded(mNotifyDataEndedStatus);
2753   }
2754 
2755   // Step 4: notify download is suspended by the cache.
2756   mClientSuspended = true;
2757   mCacheSuspended = true;
2758   mChannelEnded = true;
2759   mClient->CacheClientSuspend();
2760   mMediaCache->QueueSuspendedStatusUpdate(lock, mResourceID);
2761 
2762   // Step 5: add the stream to be managed by the cache.
2763   mMediaCache->OpenStream(lock, this, true /* aIsClone */);
2764   // Wake up the reader which is waiting for the cloned data.
2765   lock.NotifyAll();
2766 }
2767 
OwnerThread() const2768 nsISerialEventTarget* MediaCacheStream::OwnerThread() const {
2769   return mMediaCache->OwnerThread();
2770 }
2771 
GetCachedRanges(MediaByteRangeSet & aRanges)2772 nsresult MediaCacheStream::GetCachedRanges(MediaByteRangeSet& aRanges) {
2773   MOZ_ASSERT(!NS_IsMainThread());
2774   // Take the monitor, so that the cached data ranges can't grow while we're
2775   // trying to loop over them.
2776   AutoLock lock(mMediaCache->Monitor());
2777 
2778   // We must be pinned while running this, otherwise the cached data ranges may
2779   // shrink while we're trying to loop over them.
2780   NS_ASSERTION(mPinCount > 0, "Must be pinned");
2781 
2782   int64_t startOffset = GetNextCachedDataInternal(lock, 0);
2783   while (startOffset >= 0) {
2784     int64_t endOffset = GetCachedDataEndInternal(lock, startOffset);
2785     NS_ASSERTION(startOffset < endOffset,
2786                  "Buffered range must end after its start");
2787     // Bytes [startOffset..endOffset] are cached.
2788     aRanges += MediaByteRange(startOffset, endOffset);
2789     startOffset = GetNextCachedDataInternal(lock, endOffset);
2790     NS_ASSERTION(
2791         startOffset == -1 || startOffset > endOffset,
2792         "Must have advanced to start of next range, or hit end of stream");
2793   }
2794   return NS_OK;
2795 }
2796 
GetDownloadRate(bool * aIsReliable)2797 double MediaCacheStream::GetDownloadRate(bool* aIsReliable) {
2798   MOZ_ASSERT(!NS_IsMainThread());
2799   AutoLock lock(mMediaCache->Monitor());
2800   return mDownloadStatistics.GetRate(aIsReliable);
2801 }
2802 
GetDebugInfo(dom::MediaCacheStreamDebugInfo & aInfo)2803 void MediaCacheStream::GetDebugInfo(dom::MediaCacheStreamDebugInfo& aInfo) {
2804   AutoLock lock(mMediaCache->GetMonitorOnTheMainThread());
2805   aInfo.mStreamLength = mStreamLength;
2806   aInfo.mChannelOffset = mChannelOffset;
2807   aInfo.mCacheSuspended = mCacheSuspended;
2808   aInfo.mChannelEnded = mChannelEnded;
2809   aInfo.mLoadID = mLoadID;
2810 }
2811 
2812 }  // namespace mozilla
2813 
2814 // avoid redefined macro in unified build
2815 #undef LOG
2816 #undef LOGI
2817