1 /*
2  *  Copyright (C) 2005-2018 Team Kodi
3  *  This file is part of Kodi - https://kodi.tv
4  *
5  *  SPDX-License-Identifier: GPL-2.0-or-later
6  *  See LICENSES/README.md for more information.
7  */
8 
9 #include "threads/SystemClock.h"
10 #include "FileCache.h"
11 #include "threads/Thread.h"
12 #include "File.h"
13 #include "URL.h"
14 #include "ServiceBroker.h"
15 
16 #include "CircularCache.h"
17 #include "threads/SingleLock.h"
18 #include "utils/log.h"
19 #include "settings/AdvancedSettings.h"
20 #include "settings/SettingsComponent.h"
21 
22 #if !defined(TARGET_WINDOWS)
23 #include "platform/posix/ConvUtils.h"
24 #endif
25 
26 #include <algorithm>
27 #include <cassert>
28 #include <inttypes.h>
29 #include <memory>
30 
31 #ifdef TARGET_POSIX
32 #include "platform/posix/ConvUtils.h"
33 #endif
34 
35 using namespace XFILE;
36 
37 class CWriteRate
38 {
39 public:
CWriteRate()40   CWriteRate()
41   {
42     m_stamp = XbmcThreads::SystemClockMillis();
43     m_pos   = 0;
44     m_size = 0;
45     m_time = 0;
46   }
47 
Reset(int64_t pos,bool bResetAll=true)48   void Reset(int64_t pos, bool bResetAll = true)
49   {
50     m_stamp = XbmcThreads::SystemClockMillis();
51     m_pos   = pos;
52 
53     if (bResetAll)
54     {
55       m_size  = 0;
56       m_time  = 0;
57     }
58   }
59 
Rate(int64_t pos,unsigned int time_bias=0)60   unsigned Rate(int64_t pos, unsigned int time_bias = 0)
61   {
62     const unsigned ts = XbmcThreads::SystemClockMillis();
63 
64     m_size += (pos - m_pos);
65     m_time += (ts - m_stamp);
66     m_pos = pos;
67     m_stamp = ts;
68 
69     if (m_time == 0)
70       return 0;
71 
72     return (unsigned)(1000 * (m_size / (m_time + time_bias)));
73   }
74 
75 private:
76   unsigned m_stamp;
77   int64_t  m_pos;
78   unsigned m_time;
79   int64_t  m_size;
80 };
81 
82 
CFileCache(const unsigned int flags)83 CFileCache::CFileCache(const unsigned int flags)
84   : CThread("FileCache")
85   , m_seekPossible(0)
86   , m_nSeekResult(0)
87   , m_seekPos(0)
88   , m_readPos(0)
89   , m_writePos(0)
90   , m_chunkSize(0)
91   , m_writeRate(0)
92   , m_writeRateActual(0)
93   , m_forwardCacheSize(0)
94   , m_bFilling(false)
95   , m_bLowSpeedDetected(false)
96   , m_fileSize(0)
97   , m_flags(flags)
98 {
99 }
100 
~CFileCache()101 CFileCache::~CFileCache()
102 {
103   Close();
104 }
105 
GetFileImp()106 IFile *CFileCache::GetFileImp()
107 {
108   return m_source.GetImplementation();
109 }
110 
Open(const CURL & url)111 bool CFileCache::Open(const CURL& url)
112 {
113   Close();
114 
115   CSingleLock lock(m_sync);
116 
117   m_sourcePath = url.GetRedacted();
118 
119   CLog::Log(LOGDEBUG, "CFileCache::{} - <{}> opening", __FUNCTION__, m_sourcePath);
120 
121   // opening the source file.
122   if (!m_source.Open(url.Get(), READ_NO_CACHE | READ_TRUNCATED | READ_CHUNKED))
123   {
124     CLog::Log(LOGERROR, "CFileCache::{} - <{}> failed to open", __FUNCTION__, m_sourcePath);
125     Close();
126     return false;
127   }
128 
129   m_source.IoControl(IOCTRL_SET_CACHE, this);
130 
131   bool retry = false;
132   m_source.IoControl(IOCTRL_SET_RETRY, &retry); // We already handle retrying ourselves
133 
134   // check if source can seek
135   m_seekPossible = m_source.IoControl(IOCTRL_SEEK_POSSIBLE, NULL);
136 
137   // Determine the best chunk size we can use
138   m_chunkSize = CFile::DetermineChunkSize(
139       m_source.GetChunkSize(),
140       CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheChunkSize);
141   CLog::Log(LOGDEBUG,
142             "CFileCache::{} - <{}> source chunk size is {}, setting cache chunk size to {}",
143             __FUNCTION__, m_sourcePath, m_source.GetChunkSize(), m_chunkSize);
144 
145   m_fileSize = m_source.GetLength();
146 
147   if (!m_pCache)
148   {
149     if (CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheMemSize == 0)
150     {
151       // Use cache on disk
152       m_pCache = std::unique_ptr<CSimpleFileCache>(new CSimpleFileCache()); // C++14 - Replace with std::make_unique
153       m_forwardCacheSize = 0;
154     }
155     else
156     {
157       size_t cacheSize;
158       if (m_fileSize > 0 && m_fileSize < CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheMemSize && !(m_flags & READ_AUDIO_VIDEO))
159       {
160         // Cap cache size by filesize, but not for audio/video files as those may grow.
161         // We don't need to take into account READ_MULTI_STREAM here as that's only used for audio/video
162         cacheSize = m_fileSize;
163 
164         // Cap chunk size by cache size
165         if (m_chunkSize > cacheSize)
166           m_chunkSize = cacheSize;
167       }
168       else
169       {
170         cacheSize = CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheMemSize;
171 
172         // NOTE: READ_MULTI_STREAM is only used with READ_AUDIO_VIDEO
173         if (m_flags & READ_MULTI_STREAM)
174         {
175           // READ_MULTI_STREAM requires double buffering, so use half the amount of memory for each buffer
176           cacheSize /= 2;
177         }
178 
179         // Make sure cache can at least hold 2 chunks
180         if (cacheSize < m_chunkSize * 2)
181           cacheSize = m_chunkSize * 2;
182       }
183 
184       if (m_flags & READ_MULTI_STREAM)
185         CLog::Log(LOGDEBUG, "CFileCache::{} - <{}> using double memory cache each sized {} bytes",
186                   __FUNCTION__, m_sourcePath, cacheSize);
187       else
188         CLog::Log(LOGDEBUG, "CFileCache::{} - <{}> using single memory cache sized {} bytes",
189                   __FUNCTION__, m_sourcePath, cacheSize);
190 
191       const size_t back = cacheSize / 4;
192       const size_t front = cacheSize - back;
193 
194       m_pCache = std::unique_ptr<CCircularCache>(new CCircularCache(front, back)); // C++14 - Replace with std::make_unique
195       m_forwardCacheSize = front;
196     }
197 
198     if (m_flags & READ_MULTI_STREAM)
199     {
200       // If READ_MULTI_STREAM flag is set: Double buffering is required
201       m_pCache = std::unique_ptr<CDoubleCache>(new CDoubleCache(m_pCache.release())); // C++14 - Replace with std::make_unique
202     }
203   }
204 
205   // open cache strategy
206   if (!m_pCache || m_pCache->Open() != CACHE_RC_OK)
207   {
208     CLog::Log(LOGERROR, "CFileCache::{} - <{}> failed to open cache", __FUNCTION__, m_sourcePath);
209     Close();
210     return false;
211   }
212 
213   m_readPos = 0;
214   m_writePos = 0;
215   m_writeRate = 1024 * 1024;
216   m_writeRateActual = 0;
217   m_bFilling = true;
218   m_bLowSpeedDetected = false;
219   m_seekEvent.Reset();
220   m_seekEnded.Reset();
221 
222   CThread::Create(false);
223 
224   return true;
225 }
226 
Process()227 void CFileCache::Process()
228 {
229   if (!m_pCache)
230   {
231     CLog::Log(LOGERROR, "CFileCache::{} - <{}> sanity failed. no cache strategy", __FUNCTION__,
232               m_sourcePath);
233     return;
234   }
235 
236   // create our read buffer
237   std::unique_ptr<char[]> buffer(new char[m_chunkSize]);
238   if (buffer == nullptr)
239   {
240     CLog::Log(LOGERROR, "CFileCache::{} - <{}> failed to allocate read buffer", __FUNCTION__,
241               m_sourcePath);
242     return;
243   }
244 
245   CWriteRate limiter;
246   CWriteRate average;
247 
248   while (!m_bStop)
249   {
250     // Update filesize
251     m_fileSize = m_source.GetLength();
252 
253     // check for seek events
254     if (m_seekEvent.WaitMSec(0))
255     {
256       m_seekEvent.Reset();
257       int64_t cacheMaxPos = m_pCache->CachedDataEndPosIfSeekTo(m_seekPos);
258       const bool cacheReachEOF = (cacheMaxPos == m_fileSize);
259       bool sourceSeekFailed = false;
260       if (!cacheReachEOF)
261       {
262         m_nSeekResult = m_source.Seek(cacheMaxPos, SEEK_SET);
263         if (m_nSeekResult != cacheMaxPos)
264         {
265           CLog::Log(LOGERROR, "CFileCache::{} - <{}> error {} seeking. Seek returned {}",
266                     __FUNCTION__, m_sourcePath, GetLastError(), m_nSeekResult);
267           m_seekPossible = m_source.IoControl(IOCTRL_SEEK_POSSIBLE, NULL);
268           sourceSeekFailed = true;
269         }
270       }
271       if (!sourceSeekFailed)
272       {
273         const bool bCompleteReset = m_pCache->Reset(m_seekPos, false);
274         m_readPos = m_seekPos;
275         m_writePos = m_pCache->CachedDataEndPos();
276         assert(m_writePos == cacheMaxPos);
277         average.Reset(m_writePos, bCompleteReset); // Can only recalculate new average from scratch after a full reset (empty cache)
278         limiter.Reset(m_writePos);
279         m_nSeekResult = m_seekPos;
280         if (bCompleteReset)
281         {
282           CLog::Log(LOGDEBUG,
283                     "CFileCache::{} - <{}> cache completely reset for seek to position {}",
284                     __FUNCTION__, m_sourcePath, m_seekPos);
285           m_bFilling = true;
286           m_bLowSpeedDetected = false;
287         }
288       }
289 
290       m_seekEnded.Set();
291     }
292 
293     while (m_writeRate)
294     {
295       if (m_writePos - m_readPos < m_writeRate * CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheReadFactor)
296       {
297         limiter.Reset(m_writePos);
298         break;
299       }
300 
301       if (limiter.Rate(m_writePos) < m_writeRate * CServiceBroker::GetSettingsComponent()->GetAdvancedSettings()->m_cacheReadFactor)
302         break;
303 
304       if (m_seekEvent.WaitMSec(100))
305       {
306         if (!m_bStop)
307           m_seekEvent.Set();
308         break;
309       }
310     }
311 
312     const int64_t maxWrite = m_pCache->GetMaxWriteSize(m_chunkSize);
313     int64_t maxSourceRead = m_chunkSize;
314     // Cap source read size by space available between current write position and EOF
315     if (m_fileSize != 0)
316       maxSourceRead = std::min(maxSourceRead, m_fileSize - m_writePos);
317 
318     /* Only read from source if there's enough write space in the cache
319      * else we may keep disposing data and seeking back on (slow) source
320      */
321     if (maxWrite < maxSourceRead)
322     {
323       // Wait until sufficient cache write space is available
324       m_pCache->m_space.WaitMSec(5);
325       continue;
326     }
327 
328     ssize_t iRead = 0;
329     if (maxSourceRead > 0)
330       iRead = m_source.Read(buffer.get(), maxSourceRead);
331     if (iRead <= 0)
332     {
333       // Check for actual EOF and retry as long as we still have data in our cache
334       if (m_writePos < m_fileSize && m_pCache->WaitForData(0, 0) > 0)
335       {
336         CLog::Log(LOGWARNING, "CFileCache::{} - <{}> source read returned {}! Will retry",
337                   __FUNCTION__, m_sourcePath, iRead);
338 
339         // Wait a bit:
340         if (m_seekEvent.WaitMSec(2000))
341         {
342           if (!m_bStop)
343             m_seekEvent.Set(); // hack so that later we realize seek is needed
344         }
345 
346         // and retry:
347         continue; // while (!m_bStop)
348       }
349       else
350       {
351         if (iRead < 0)
352           CLog::Log(LOGERROR,
353                     "{} - <{}> source read failed with {}!", __FUNCTION__, m_sourcePath, iRead);
354         else if (m_fileSize == 0)
355           CLog::Log(LOGDEBUG,
356                     "CFileCache::{} - <{}> source read didn't return any data! Hit eof(?)",
357                     __FUNCTION__, m_sourcePath);
358         else if (m_writePos < m_fileSize)
359           CLog::Log(LOGERROR,
360                     "CFileCache::{} - <{}> source read didn't return any data before eof!",
361                     __FUNCTION__, m_sourcePath);
362         else
363           CLog::Log(LOGDEBUG, "CFileCache::{} - <{}> source read hit eof", __FUNCTION__,
364                     m_sourcePath);
365 
366         m_pCache->EndOfInput();
367 
368         // The thread event will now also cause the wait of an event to return a false.
369         if (AbortableWait(m_seekEvent) == WAIT_SIGNALED)
370         {
371           m_pCache->ClearEndOfInput();
372           if (!m_bStop)
373             m_seekEvent.Set(); // hack so that later we realize seek is needed
374         }
375         else
376           break; // while (!m_bStop)
377       }
378     }
379 
380     int iTotalWrite = 0;
381     while (!m_bStop && (iTotalWrite < iRead))
382     {
383       int iWrite = 0;
384       iWrite = m_pCache->WriteToCache(buffer.get() + iTotalWrite, iRead - iTotalWrite);
385 
386       // write should always work. all handling of buffering and errors should be
387       // done inside the cache strategy. only if unrecoverable error happened, WriteToCache would return error and we break.
388       if (iWrite < 0)
389       {
390         CLog::Log(LOGERROR, "CFileCache::{} - <{}> error writing to cache", __FUNCTION__,
391                   m_sourcePath);
392         m_bStop = true;
393         break;
394       }
395       else if (iWrite == 0)
396       {
397         m_pCache->m_space.WaitMSec(5);
398       }
399 
400       iTotalWrite += iWrite;
401 
402       // check if seek was asked. otherwise if cache is full we'll freeze.
403       if (m_seekEvent.WaitMSec(0))
404       {
405         if (!m_bStop)
406           m_seekEvent.Set(); // make sure we get the seek event later.
407         break;
408       }
409     }
410 
411     m_writePos += iTotalWrite;
412 
413     // under estimate write rate by a second, to
414     // avoid uncertainty at start of caching
415     m_writeRateActual = average.Rate(m_writePos, 1000);
416 
417     // NOTE: Hysteresis (20-80%) for filling-logic
418     const int64_t forward = m_pCache->WaitForData(0, 0);
419     const float level =
420         (m_forwardCacheSize == 0) ? 0.0 : static_cast<float>(forward) / m_forwardCacheSize;
421 
422     if (level > 0.8f)
423     {
424      /* NOTE: We can only reliably test for low speed condition, when the cache is *really*
425       * filling. This is because as soon as it's full the average-
426       * rate will become approximately the current-rate which can flag false
427       * low read-rate conditions.
428       */
429       if (m_bFilling && m_writeRateActual < m_writeRate)
430         m_bLowSpeedDetected = true;
431 
432       m_bFilling = false;
433     }
434     else if (level < 0.2f)
435     {
436       m_bFilling = true;
437     }
438   }
439 }
440 
OnExit()441 void CFileCache::OnExit()
442 {
443   m_bStop = true;
444 
445   // make sure cache is set to mark end of file (read may be waiting).
446   if (m_pCache)
447     m_pCache->EndOfInput();
448 
449   // just in case someone's waiting...
450   m_seekEnded.Set();
451 }
452 
Exists(const CURL & url)453 bool CFileCache::Exists(const CURL& url)
454 {
455   return CFile::Exists(url.Get());
456 }
457 
Stat(const CURL & url,struct __stat64 * buffer)458 int CFileCache::Stat(const CURL& url, struct __stat64* buffer)
459 {
460   return CFile::Stat(url.Get(), buffer);
461 }
462 
Read(void * lpBuf,size_t uiBufSize)463 ssize_t CFileCache::Read(void* lpBuf, size_t uiBufSize)
464 {
465   CSingleLock lock(m_sync);
466   if (!m_pCache)
467   {
468     CLog::Log(LOGERROR, "CFileCache::{} - <{}> sanity failed. no cache strategy!", __FUNCTION__,
469               m_sourcePath);
470     return -1;
471   }
472   int64_t iRc;
473 
474   if (uiBufSize > SSIZE_MAX)
475     uiBufSize = SSIZE_MAX;
476 
477 retry:
478   // attempt to read
479   iRc = m_pCache->ReadFromCache((char *)lpBuf, uiBufSize);
480   if (iRc > 0)
481   {
482     m_readPos += iRc;
483     return (int)iRc;
484   }
485 
486   if (iRc == CACHE_RC_WOULD_BLOCK)
487   {
488     // just wait for some data to show up
489     iRc = m_pCache->WaitForData(1, 10000);
490     if (iRc > 0)
491       goto retry;
492   }
493 
494   if (iRc == CACHE_RC_TIMEOUT)
495   {
496     CLog::Log(LOGWARNING, "CFileCache::{} - <{}> timeout waiting for data", __FUNCTION__,
497               m_sourcePath);
498     return -1;
499   }
500 
501   if (iRc == 0)
502     return 0;
503 
504   // unknown error code
505   CLog::Log(LOGERROR, "CFileCache::{} - <{}> cache strategy returned unknown error code {}",
506             __FUNCTION__, m_sourcePath, (int)iRc);
507   return -1;
508 }
509 
Seek(int64_t iFilePosition,int iWhence)510 int64_t CFileCache::Seek(int64_t iFilePosition, int iWhence)
511 {
512   CSingleLock lock(m_sync);
513 
514   if (!m_pCache)
515   {
516     CLog::Log(LOGERROR, "CFileCache::{} - <{}> sanity failed. no cache strategy!", __FUNCTION__,
517               m_sourcePath);
518     return -1;
519   }
520 
521   int64_t iCurPos = m_readPos;
522   int64_t iTarget = iFilePosition;
523   if (iWhence == SEEK_END)
524     iTarget = m_fileSize + iTarget;
525   else if (iWhence == SEEK_CUR)
526     iTarget = iCurPos + iTarget;
527   else if (iWhence != SEEK_SET)
528     return -1;
529 
530   if (iTarget == m_readPos)
531     return m_readPos;
532 
533   if ((m_nSeekResult = m_pCache->Seek(iTarget)) != iTarget)
534   {
535     if (m_seekPossible == 0)
536       return m_nSeekResult;
537 
538     /* never request closer to end than 2k, speeds up tag reading */
539     m_seekPos = std::min(iTarget, std::max((int64_t)0, m_fileSize - m_chunkSize));
540 
541     m_seekEvent.Set();
542     while (!m_seekEnded.WaitMSec(100))
543     {
544       // SeekEnded will never be set if FileCache thread is not running
545       if (!CThread::IsRunning())
546         return -1;
547     }
548 
549     /* wait for any remaining data */
550     if(m_seekPos < iTarget)
551     {
552       CLog::Log(LOGDEBUG, "CFileCache::{} - <{}> waiting for position {}", __FUNCTION__,
553                 m_sourcePath, iTarget);
554       if(m_pCache->WaitForData((unsigned)(iTarget - m_seekPos), 10000) < iTarget - m_seekPos)
555       {
556         CLog::Log(LOGWARNING, "CFileCache::{} - <{}> failed to get remaining data", __FUNCTION__,
557                   m_sourcePath);
558         return -1;
559       }
560       m_pCache->Seek(iTarget);
561     }
562     m_readPos = iTarget;
563     m_seekEvent.Reset();
564   }
565   else
566     m_readPos = iTarget;
567 
568   return iTarget;
569 }
570 
Close()571 void CFileCache::Close()
572 {
573   StopThread();
574 
575   CSingleLock lock(m_sync);
576   if (m_pCache)
577     m_pCache->Close();
578 
579   m_source.Close();
580 }
581 
GetPosition()582 int64_t CFileCache::GetPosition()
583 {
584   return m_readPos;
585 }
586 
GetLength()587 int64_t CFileCache::GetLength()
588 {
589   return m_fileSize;
590 }
591 
StopThread(bool bWait)592 void CFileCache::StopThread(bool bWait /*= true*/)
593 {
594   m_bStop = true;
595   //Process could be waiting for seekEvent
596   m_seekEvent.Set();
597   CThread::StopThread(bWait);
598 }
599 
GetProperty(XFILE::FileProperty type,const std::string & name) const600 const std::string CFileCache::GetProperty(XFILE::FileProperty type, const std::string &name) const
601 {
602   if (!m_source.GetImplementation())
603     return IFile::GetProperty(type, name);
604 
605   return m_source.GetImplementation()->GetProperty(type, name);
606 }
607 
IoControl(EIoControl request,void * param)608 int CFileCache::IoControl(EIoControl request, void* param)
609 {
610   if (request == IOCTRL_CACHE_STATUS)
611   {
612     SCacheStatus* status = (SCacheStatus*)param;
613     status->forward = m_pCache->WaitForData(0, 0);
614     status->maxrate = m_writeRate;
615     status->currate = m_writeRateActual;
616     status->lowspeed = m_bLowSpeedDetected;
617     m_bLowSpeedDetected = false; // Reset flag
618     return 0;
619   }
620 
621   if (request == IOCTRL_CACHE_SETRATE)
622   {
623     m_writeRate = *(unsigned*)param;
624     return 0;
625   }
626 
627   if (request == IOCTRL_SEEK_POSSIBLE)
628     return m_seekPossible;
629 
630   return -1;
631 }
632