1 #include "analyzer/trackanalysisscheduler.h"
2 
3 #include "library/library.h"
4 #include "library/trackcollection.h"
5 #include "moc_trackanalysisscheduler.cpp"
6 #include "util/logger.h"
7 
8 namespace {
9 
10 mixxx::Logger kLogger("TrackAnalysisScheduler");
11 
12 constexpr QThread::Priority kWorkerThreadPriority = QThread::LowPriority;
13 
14 // Maximum frequency of progress updates
15 constexpr std::chrono::milliseconds kProgressInhibitDuration(100);
16 
deleteTrackAnalysisScheduler(TrackAnalysisScheduler * plainPtr)17 void deleteTrackAnalysisScheduler(TrackAnalysisScheduler* plainPtr) {
18     if (plainPtr) {
19         // Trigger stop
20         plainPtr->stop();
21         // Release ownership and let Qt delete the queue later
22         plainPtr->deleteLater();
23     }
24 }
25 
26 } // anonymous namespace
27 
NullPointer()28 TrackAnalysisScheduler::NullPointer::NullPointer()
29     : Pointer(nullptr, [](TrackAnalysisScheduler*){}) {
30 }
31 
32 //static
createInstance(Library * library,int numWorkerThreads,const UserSettingsPointer & pConfig,AnalyzerModeFlags modeFlags)33 TrackAnalysisScheduler::Pointer TrackAnalysisScheduler::createInstance(
34         Library* library,
35         int numWorkerThreads,
36         const UserSettingsPointer& pConfig,
37         AnalyzerModeFlags modeFlags) {
38     return Pointer(new TrackAnalysisScheduler(
39             library,
40             numWorkerThreads,
41             pConfig,
42             modeFlags),
43             deleteTrackAnalysisScheduler);
44 }
45 
TrackAnalysisScheduler(Library * library,int numWorkerThreads,const UserSettingsPointer & pConfig,AnalyzerModeFlags modeFlags)46 TrackAnalysisScheduler::TrackAnalysisScheduler(
47         Library* library,
48         int numWorkerThreads,
49         const UserSettingsPointer& pConfig,
50         AnalyzerModeFlags modeFlags)
51         : m_library(library),
52           m_currentTrackProgress(kAnalyzerProgressUnknown),
53           m_currentTrackNumber(0),
54           m_dequeuedTracksCount(0),
55           // The first signal should always be emitted
56           m_lastProgressEmittedAt(Clock::now() - kProgressInhibitDuration) {
57     VERIFY_OR_DEBUG_ASSERT(numWorkerThreads > 0) {
58             kLogger.warning()
59                     << "Invalid number of worker threads:"
60                     << numWorkerThreads;
61     } else {
62         kLogger.debug()
63                 << "Starting"
64                 << numWorkerThreads
65                 << "worker threads. Priority: "
66                 << (modeFlags & AnalyzerModeFlags::LowPriority ? "low" : "normal");
67     }
68     // 1st pass: Create worker threads
69     m_workers.reserve(numWorkerThreads);
70     for (int threadId = 0; threadId < numWorkerThreads; ++threadId) {
71         m_workers.emplace_back(AnalyzerThread::createInstance(
72                 threadId,
73                 library->dbConnectionPool(),
74                 pConfig,
75                 modeFlags));
76         connect(m_workers.back().thread(), &AnalyzerThread::progress,
77             this, &TrackAnalysisScheduler::onWorkerThreadProgress);
78     }
79     // 2nd pass: Start worker threads in a suspended state
80     for (const auto& worker: m_workers) {
81         worker.thread()->suspend();
82         worker.thread()->start(kWorkerThreadPriority);
83     }
84 }
85 
~TrackAnalysisScheduler()86 TrackAnalysisScheduler::~TrackAnalysisScheduler() {
87     kLogger.debug() << "Destroying";
88 }
89 
emitProgressOrFinished()90 void TrackAnalysisScheduler::emitProgressOrFinished() {
91     // The finished() signal is emitted regardless of when the last
92     // signal has been emitted
93     if (allTracksFinished()) {
94         m_currentTrackProgress = kAnalyzerProgressUnknown;
95         m_currentTrackNumber = 0;
96         m_dequeuedTracksCount = 0;
97         emit finished();
98         return;
99     }
100 
101     const auto now = Clock::now();
102     if (now < (m_lastProgressEmittedAt + kProgressInhibitDuration)) {
103         // Inhibit signal
104         return;
105     }
106     m_lastProgressEmittedAt = now;
107 
108     DEBUG_ASSERT(m_pendingTrackIds.size() <=
109             static_cast<size_t>(m_dequeuedTracksCount));
110     const int finishedTracksCount =
111             m_dequeuedTracksCount - static_cast<int>(m_pendingTrackIds.size());
112 
113     AnalyzerProgress workerProgressSum = 0;
114     int workerProgressCount = 0;
115     for (const auto& worker: m_workers) {
116         const AnalyzerProgress workerProgress = worker.analyzerProgress();
117         if (workerProgress >= kAnalyzerProgressNone) {
118             workerProgressSum += workerProgress;
119             ++workerProgressCount;
120         }
121     }
122     // The following algorithm/heuristic is used for calculating the
123     // amortized analysis progress (current track number + current
124     // track progress) across all worker threads. It results in a
125     // simple and almost linear progress display when multiple threads
126     // are running in parallel. It also covers the expected behavior
127     // for the single-threaded case. The receiver of progress updates
128     // should not need to know how many threads are actually processing
129     // tracks concurrently behind the scenes.
130     if (workerProgressCount > 0) {
131         DEBUG_ASSERT(kAnalyzerProgressNone == 0);
132         DEBUG_ASSERT(kAnalyzerProgressDone == 1);
133         const int inProgressCount =
134                 math_max(1, int(std::ceil(workerProgressSum)));
135         const AnalyzerProgress currentTrackProgress =
136                 workerProgressSum - std::floor(workerProgressSum);
137         // The calculation of inProgressCount is only an approximation.
138         // In some situations the calculated virtual current track number
139         // = finishedTracksCount + inProgressCount exceeds the upper
140         // bound m_dequeuedTracksCount. Using the minimum of both values
141         // is an appropriate choice for reporting continuous progress.
142         const int currentTrackNumber =
143                 math_min(finishedTracksCount + inProgressCount, m_dequeuedTracksCount);
144         // The combination of the values current count (primary) and current
145         // progress (secondary) should never decrease to avoid confusion
146         if (m_currentTrackNumber < currentTrackNumber) {
147             m_currentTrackNumber = currentTrackNumber;
148             // Unconditional progress update
149             m_currentTrackProgress = currentTrackProgress;
150         } else if (m_currentTrackNumber == currentTrackNumber) {
151             // Conditional progress update if current count didn't change
152             if (m_currentTrackProgress >= kAnalyzerProgressNone) {
153                 // Current progress should not decrease while the count is constant
154                 m_currentTrackProgress = math_max(m_currentTrackProgress, currentTrackProgress);
155             } else {
156                 // Initialize current progress
157                 m_currentTrackProgress = currentTrackProgress;
158             }
159         }
160     } else {
161         if (m_currentTrackNumber < finishedTracksCount) {
162             m_currentTrackNumber = finishedTracksCount;
163         }
164     }
165     const int totalTracksCount =
166             m_dequeuedTracksCount + static_cast<int>(m_queuedTrackIds.size());
167     DEBUG_ASSERT(m_currentTrackNumber <= m_dequeuedTracksCount);
168     DEBUG_ASSERT(m_dequeuedTracksCount <= totalTracksCount);
169     emit progress(
170             m_currentTrackProgress,
171             m_currentTrackNumber,
172             totalTracksCount);
173 }
174 
onWorkerThreadProgress(int threadId,AnalyzerThreadState threadState,TrackId trackId,AnalyzerProgress analyzerProgress)175 void TrackAnalysisScheduler::onWorkerThreadProgress(
176         int threadId,
177         AnalyzerThreadState threadState,
178         TrackId trackId,
179         AnalyzerProgress analyzerProgress) {
180     if (kLogger.traceEnabled()) {
181         kLogger.trace() << "onWorkerThreadProgress"
182                 << threadId
183                 << int(threadState)
184                 << trackId
185                 << analyzerProgress;
186     }
187     auto& worker = m_workers.at(threadId);
188     switch (threadState) {
189     case AnalyzerThreadState::Void:
190         DEBUG_ASSERT(!trackId.isValid());
191         DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
192         break;
193     case AnalyzerThreadState::Idle:
194         DEBUG_ASSERT(!trackId.isValid());
195         DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
196         worker.onAnalyzerProgress(analyzerProgress);
197         submitNextTrack(&worker);
198         break;
199     case AnalyzerThreadState::Busy:
200         DEBUG_ASSERT(trackId.isValid());
201         // Ignore delayed signals for tracks that are no longer pending
202         if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
203             DEBUG_ASSERT(analyzerProgress != kAnalyzerProgressUnknown);
204             DEBUG_ASSERT(analyzerProgress < kAnalyzerProgressDone);
205             worker.onAnalyzerProgress(analyzerProgress);
206             emit trackProgress(trackId, analyzerProgress);
207         }
208         break;
209     case AnalyzerThreadState::Done:
210         DEBUG_ASSERT(trackId.isValid());
211         // Ignore delayed signals for tracks that are no longer pending
212         if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
213             DEBUG_ASSERT((analyzerProgress == kAnalyzerProgressDone) // success
214                     || (analyzerProgress == kAnalyzerProgressUnknown)); // failure
215             m_pendingTrackIds.erase(trackId);
216             worker.onAnalyzerProgress(analyzerProgress);
217             emit trackProgress(trackId, analyzerProgress);
218         }
219         break;
220     case AnalyzerThreadState::Exit:
221         DEBUG_ASSERT(!trackId.isValid());
222         DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
223         worker.onThreadExit();
224         DEBUG_ASSERT(!worker);
225         break;
226     default:
227         DEBUG_ASSERT(!"Unhandled signal from worker thread");
228     }
229     emitProgressOrFinished();
230 }
231 
scheduleTrackById(TrackId trackId)232 bool TrackAnalysisScheduler::scheduleTrackById(TrackId trackId) {
233     VERIFY_OR_DEBUG_ASSERT(trackId.isValid()) {
234         qWarning()
235                 << "Cannot schedule track with invalid id"
236                 << trackId;
237         return false;
238     }
239     m_queuedTrackIds.push_back(trackId);
240     // Don't wake up the suspended thread now to avoid race conditions
241     // if multiple threads are added in a row by calling this function
242     // multiple times. The caller is responsible to finish the scheduling
243     // of multiple tracks with resume().
244     return true;
245 }
246 
scheduleTracksById(const QList<TrackId> & trackIds)247 int TrackAnalysisScheduler::scheduleTracksById(const QList<TrackId>& trackIds) {
248     int scheduledCount = 0;
249     for (auto trackId: trackIds) {
250         if (scheduleTrackById(std::move(trackId))) {
251             ++scheduledCount;
252         }
253     }
254     return scheduledCount;
255 }
256 
suspend()257 void TrackAnalysisScheduler::suspend() {
258     kLogger.debug() << "Suspending";
259     for (auto& worker: m_workers) {
260         worker.suspendThread();
261     }
262 }
263 
resume()264 void TrackAnalysisScheduler::resume() {
265     kLogger.debug() << "Resuming";
266     for (auto& worker: m_workers) {
267         worker.resumeThread();
268     }
269 }
270 
submitNextTrack(Worker * worker)271 bool TrackAnalysisScheduler::submitNextTrack(Worker* worker) {
272     DEBUG_ASSERT(worker);
273     while (!m_queuedTrackIds.empty()) {
274         TrackId nextTrackId = m_queuedTrackIds.front();
275         DEBUG_ASSERT(nextTrackId.isValid());
276         if (nextTrackId.isValid()) {
277             TrackPointer nextTrack =
278                     m_library->trackCollection().getTrackById(nextTrackId);
279             if (nextTrack) {
280                 if (m_pendingTrackIds.insert(nextTrackId).second) {
281                     if (worker->submitNextTrack(std::move(nextTrack))) {
282                         m_queuedTrackIds.pop_front();
283                         ++m_dequeuedTracksCount;
284                         return true;
285                     } else {
286                         // The worker may already have been assigned new tasks
287                         // in the mean time, nothing to worry about.
288                         m_pendingTrackIds.erase(nextTrackId);
289                         kLogger.debug()
290                                 << "Failed to submit next track - worker thread"
291                                 << worker->thread()->id()
292                                 << "is busy";
293                         // Early exit to avoid popping the next track from the queue (see below)!
294                         return false;
295                     }
296                 } else {
297                     // This track is currently analyzed by one of the workers
298                     kLogger.debug()
299                             << "Skipping duplicate track id"
300                             << nextTrackId;
301                 }
302             } else {
303                 kLogger.warning()
304                         << "Failed to load track by id"
305                         << nextTrackId;
306             }
307         } else {
308             kLogger.warning()
309                     << "Invalid track id"
310                     << nextTrackId;
311         }
312         // Skip this track
313         m_queuedTrackIds.pop_front();
314         ++m_dequeuedTracksCount;
315     }
316     return false;
317 }
318 
stop()319 void TrackAnalysisScheduler::stop() {
320     kLogger.debug() << "Stopping";
321     for (auto& worker: m_workers) {
322         worker.stopThread();
323     }
324     // The worker threads are still running at this point
325     // and m_workers must not be modified!
326     m_queuedTrackIds.clear();
327     m_pendingTrackIds.clear();
328     DEBUG_ASSERT((allTracksFinished()));
329 }
330 
stopAndCollectScheduledTrackIds()331 QList<TrackId> TrackAnalysisScheduler::stopAndCollectScheduledTrackIds() {
332     QList<TrackId> scheduledTrackIds;
333     scheduledTrackIds.reserve(static_cast<int>(m_queuedTrackIds.size() + m_pendingTrackIds.size()));
334     for (auto queuedTrackId: m_queuedTrackIds) {
335         scheduledTrackIds.append(std::move(queuedTrackId));
336     }
337     for (auto pendingTrackId: m_pendingTrackIds) {
338         scheduledTrackIds.append(std::move(pendingTrackId));
339     }
340     // Stopping the scheduler will clear all queued and pending tracks,
341     // so we need to do this after we have collected all scheduled tracks!
342     stop();
343     return scheduledTrackIds;
344 }
345