1 /* SPDX-FileCopyrightText: 2018-2020 The KPhotoAlbum Development Team
2 
3    SPDX-License-Identifier: GPL-2.0-or-later
4 */
5 
6 #include "ImageScout.h"
7 
8 #include <kpabase/Logging.h>
9 
10 #include <QAtomicInt>
11 #include <QDataStream>
12 #include <QFile>
13 #include <QMutexLocker>
14 #include <QThread>
15 
16 extern "C" {
17 #include <fcntl.h>
18 #include <unistd.h>
19 }
20 
21 using namespace DB;
22 
23 namespace
24 {
25 constexpr int DEFAULT_SCOUT_BUFFER_SIZE = 1048576; // *sizeof(int) bytes
26 // We might want this to be bytes rather than images.
27 constexpr int DEFAULT_MAX_SEEKAHEAD_IMAGES = 10;
28 constexpr int SEEKAHEAD_WAIT_MS = 10; // 10 milliseconds, and retry
29 constexpr int TERMINATION_WAIT_MS = 10; // 10 milliseconds, and retry
30 }
31 
32 // 1048576 with a single scout thread empirically yields best performance
33 // on a Seagate 2TB 2.5" disk, sustaining throughput in the range of
34 // 95-100 MB/sec with 100-110 IO/sec on large files.  This is close to what
35 // would be expected.  A SATA SSD (Crucial MX300) is much less sensitive to
36 // I/O size and scout thread, achieving about 340 MB/sec with high CPU
37 // utilization.
38 
39 class DB::ImageScoutThread : public QThread
40 {
41     friend class DB::ImageScout;
42 
43 public:
44     ImageScoutThread(ImageScoutQueue &, QMutex *, QAtomicInt &count,
45                      QAtomicInt &preloadCount, QAtomicInt &skippedCount);
46 
47 protected:
48     void run() override;
49     void setBufSize(int);
50     int getBufSize();
51     void setMaxSeekAhead(int);
52     int getMaxSeekAhead();
53     void setReadLimit(int);
54     int getReadLimit();
55     void setPreloadFunc(PreloadFunc);
56     PreloadFunc getPreloadFunc();
57 
58 private:
59     void doRun(char *);
60     ImageScoutQueue &m_queue;
61     QMutex *m_mutex;
62     QAtomicInt &m_loadedCount;
63     QAtomicInt &m_preloadedCount;
64     QAtomicInt &m_skippedCount;
65     int m_scoutBufSize;
66     int m_maxSeekAhead;
67     int m_readLimit;
68     PreloadFunc m_preloadFunc;
69     bool m_isStarted;
70 };
71 
ImageScoutThread(ImageScoutQueue & queue,QMutex * mutex,QAtomicInt & count,QAtomicInt & preloadedCount,QAtomicInt & skippedCount)72 ImageScoutThread::ImageScoutThread(ImageScoutQueue &queue, QMutex *mutex,
73                                    QAtomicInt &count,
74                                    QAtomicInt &preloadedCount,
75                                    QAtomicInt &skippedCount)
76     : m_queue(queue)
77     , m_mutex(mutex)
78     , m_loadedCount(count)
79     , m_preloadedCount(preloadedCount)
80     , m_skippedCount(skippedCount)
81     , m_scoutBufSize(DEFAULT_SCOUT_BUFFER_SIZE)
82     , m_maxSeekAhead(DEFAULT_MAX_SEEKAHEAD_IMAGES)
83     , m_readLimit(-1)
84     , m_preloadFunc(NULL)
85     , m_isStarted(false)
86 {
87 }
88 
doRun(char * tmpBuf)89 void ImageScoutThread::doRun(char *tmpBuf)
90 {
91     while (!isInterruptionRequested()) {
92         QMutexLocker locker(m_mutex);
93         if (m_queue.isEmpty()) {
94             return;
95         }
96         DB::FileName fileName = m_queue.dequeue();
97         locker.unlock();
98         // If we're behind the reader, move along
99         m_preloadedCount++;
100         if (m_loadedCount.load() >= m_preloadedCount.load()) {
101             m_skippedCount++;
102             continue;
103         } else {
104             // Don't get too far ahead of the loader, or we just waste memory
105             // TODO: wait on something rather than polling
106             while (m_preloadedCount.load() >= m_loadedCount.load() + m_maxSeekAhead && !isInterruptionRequested()) {
107                 QThread::msleep(SEEKAHEAD_WAIT_MS);
108             }
109             // qCDebug(DBImageScoutLog) << ">>>>>Scout: preload" << m_preloadedCount.load() << "load" << m_loadedCount.load() << fileName.relative();
110         }
111         if (m_preloadFunc) {
112             (*m_preloadFunc)(fileName);
113         } else {
114             // Note(jzarl): for Windows, we'd need a functional replacement for open(), read(), close() in unistd.h
115             int inputFD = open(QFile::encodeName(fileName.absolute()).constData(), O_RDONLY);
116             int bytesRead = 0;
117             if (inputFD >= 0) {
118                 while (read(inputFD, tmpBuf, m_scoutBufSize) && (m_readLimit < 0 || ((bytesRead += m_scoutBufSize) < m_readLimit)) && !isInterruptionRequested()) {
119                 }
120                 (void)close(inputFD);
121             }
122         }
123     }
124 }
125 
setBufSize(int bufSize)126 void ImageScoutThread::setBufSize(int bufSize)
127 {
128     if (!m_isStarted)
129         m_scoutBufSize = bufSize;
130 }
131 
getBufSize()132 int ImageScoutThread::getBufSize()
133 {
134     return m_scoutBufSize;
135 }
136 
setMaxSeekAhead(int maxSeekAhead)137 void ImageScoutThread::setMaxSeekAhead(int maxSeekAhead)
138 {
139     if (!m_isStarted)
140         m_maxSeekAhead = maxSeekAhead;
141 }
142 
getMaxSeekAhead()143 int ImageScoutThread::getMaxSeekAhead()
144 {
145     return m_maxSeekAhead;
146 }
147 
setReadLimit(int readLimit)148 void ImageScoutThread::setReadLimit(int readLimit)
149 {
150     if (!m_isStarted)
151         m_readLimit = readLimit;
152 }
153 
getReadLimit()154 int ImageScoutThread::getReadLimit()
155 {
156     return m_readLimit;
157 }
158 
setPreloadFunc(PreloadFunc scoutFunc)159 void ImageScoutThread::setPreloadFunc(PreloadFunc scoutFunc)
160 {
161     if (!m_isStarted)
162         m_preloadFunc = scoutFunc;
163 }
164 
getPreloadFunc()165 PreloadFunc ImageScoutThread::getPreloadFunc()
166 {
167     return m_preloadFunc;
168 }
169 
run()170 void ImageScoutThread::run()
171 {
172     m_isStarted = true;
173     char *tmpBuf = new char[m_scoutBufSize];
174     doRun(tmpBuf);
175     delete[] tmpBuf;
176 }
177 
ImageScout(ImageScoutQueue & images,QAtomicInt & count,int threads)178 ImageScout::ImageScout(ImageScoutQueue &images,
179                        QAtomicInt &count,
180                        int threads)
181     : m_preloadedCount(0)
182     , m_skippedCount(0)
183     , m_isStarted(false)
184     , m_scoutBufSize(DEFAULT_SCOUT_BUFFER_SIZE)
185     , m_maxSeekAhead(DEFAULT_MAX_SEEKAHEAD_IMAGES)
186     , m_readLimit(-1)
187     , m_preloadFunc(NULL)
188 {
189     if (threads > 0) {
190         for (int i = 0; i < threads; i++) {
191             ImageScoutThread *t = new ImageScoutThread(images,
192                                                        threads > 1 ? &m_mutex : nullptr,
193                                                        count,
194                                                        m_preloadedCount,
195                                                        m_skippedCount);
196             m_scoutList.append(t);
197         }
198     }
199 }
200 
~ImageScout()201 ImageScout::~ImageScout()
202 {
203     if (m_scoutList.count() > 0) {
204         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
205              it != m_scoutList.end(); ++it) {
206             if (m_isStarted) {
207                 if (!(*it)->isFinished()) {
208                     (*it)->requestInterruption();
209                     while (!(*it)->isFinished())
210                         QThread::msleep(TERMINATION_WAIT_MS);
211                 }
212             }
213             delete (*it);
214         }
215     }
216     qCDebug(DBImageScoutLog) << "Total files:" << m_preloadedCount << "skipped" << m_skippedCount;
217 }
218 
start()219 void ImageScout::start()
220 {
221     // Yes, there's a race condition here between isStartd and setting
222     // the buf size or seek ahead...but this isn't a hot code path!
223     if (!m_isStarted && m_scoutList.count() > 0) {
224         m_isStarted = true;
225         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
226              it != m_scoutList.end(); ++it) {
227             (*it)->start();
228         }
229     }
230 }
231 
setBufSize(int bufSize)232 void ImageScout::setBufSize(int bufSize)
233 {
234     if (!m_isStarted && bufSize > 0) {
235         m_scoutBufSize = bufSize;
236         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
237              it != m_scoutList.end(); ++it) {
238             (*it)->setBufSize(m_scoutBufSize);
239         }
240     }
241 }
242 
getBufSize()243 int ImageScout::getBufSize()
244 {
245     return m_scoutBufSize;
246 }
247 
setMaxSeekAhead(int maxSeekAhead)248 void ImageScout::setMaxSeekAhead(int maxSeekAhead)
249 {
250     if (!m_isStarted && maxSeekAhead > 0) {
251         m_maxSeekAhead = maxSeekAhead;
252         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
253              it != m_scoutList.end(); ++it) {
254             (*it)->setMaxSeekAhead(m_maxSeekAhead);
255         }
256     }
257 }
258 
getMaxSeekAhead()259 int ImageScout::getMaxSeekAhead()
260 {
261     return m_maxSeekAhead;
262 }
263 
setReadLimit(int readLimit)264 void ImageScout::setReadLimit(int readLimit)
265 {
266     if (!m_isStarted && readLimit > 0) {
267         m_readLimit = readLimit;
268         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
269              it != m_scoutList.end(); ++it) {
270             (*it)->setReadLimit(m_readLimit);
271         }
272     }
273 }
274 
getReadLimit()275 int ImageScout::getReadLimit()
276 {
277     return m_readLimit;
278 }
279 
setPreloadFunc(PreloadFunc scoutFunc)280 void ImageScout::setPreloadFunc(PreloadFunc scoutFunc)
281 {
282     if (!m_isStarted) {
283         m_preloadFunc = scoutFunc;
284         for (QList<ImageScoutThread *>::iterator it = m_scoutList.begin();
285              it != m_scoutList.end(); ++it) {
286             (*it)->setPreloadFunc(m_preloadFunc);
287         }
288     }
289 }
290 
getPreloadFunc()291 PreloadFunc ImageScout::getPreloadFunc()
292 {
293     return m_preloadFunc;
294 }
295 
296 // vi:expandtab:tabstop=4 shiftwidth=4:
297