1 #include "analyzer/trackanalysisscheduler.h"
2
3 #include "library/library.h"
4 #include "library/trackcollection.h"
5 #include "moc_trackanalysisscheduler.cpp"
6 #include "util/logger.h"
7
8 namespace {
9
10 mixxx::Logger kLogger("TrackAnalysisScheduler");
11
12 constexpr QThread::Priority kWorkerThreadPriority = QThread::LowPriority;
13
14 // Maximum frequency of progress updates
15 constexpr std::chrono::milliseconds kProgressInhibitDuration(100);
16
deleteTrackAnalysisScheduler(TrackAnalysisScheduler * plainPtr)17 void deleteTrackAnalysisScheduler(TrackAnalysisScheduler* plainPtr) {
18 if (plainPtr) {
19 // Trigger stop
20 plainPtr->stop();
21 // Release ownership and let Qt delete the queue later
22 plainPtr->deleteLater();
23 }
24 }
25
26 } // anonymous namespace
27
NullPointer()28 TrackAnalysisScheduler::NullPointer::NullPointer()
29 : Pointer(nullptr, [](TrackAnalysisScheduler*){}) {
30 }
31
32 //static
createInstance(Library * library,int numWorkerThreads,const UserSettingsPointer & pConfig,AnalyzerModeFlags modeFlags)33 TrackAnalysisScheduler::Pointer TrackAnalysisScheduler::createInstance(
34 Library* library,
35 int numWorkerThreads,
36 const UserSettingsPointer& pConfig,
37 AnalyzerModeFlags modeFlags) {
38 return Pointer(new TrackAnalysisScheduler(
39 library,
40 numWorkerThreads,
41 pConfig,
42 modeFlags),
43 deleteTrackAnalysisScheduler);
44 }
45
TrackAnalysisScheduler(Library * library,int numWorkerThreads,const UserSettingsPointer & pConfig,AnalyzerModeFlags modeFlags)46 TrackAnalysisScheduler::TrackAnalysisScheduler(
47 Library* library,
48 int numWorkerThreads,
49 const UserSettingsPointer& pConfig,
50 AnalyzerModeFlags modeFlags)
51 : m_library(library),
52 m_currentTrackProgress(kAnalyzerProgressUnknown),
53 m_currentTrackNumber(0),
54 m_dequeuedTracksCount(0),
55 // The first signal should always be emitted
56 m_lastProgressEmittedAt(Clock::now() - kProgressInhibitDuration) {
57 VERIFY_OR_DEBUG_ASSERT(numWorkerThreads > 0) {
58 kLogger.warning()
59 << "Invalid number of worker threads:"
60 << numWorkerThreads;
61 } else {
62 kLogger.debug()
63 << "Starting"
64 << numWorkerThreads
65 << "worker threads. Priority: "
66 << (modeFlags & AnalyzerModeFlags::LowPriority ? "low" : "normal");
67 }
68 // 1st pass: Create worker threads
69 m_workers.reserve(numWorkerThreads);
70 for (int threadId = 0; threadId < numWorkerThreads; ++threadId) {
71 m_workers.emplace_back(AnalyzerThread::createInstance(
72 threadId,
73 library->dbConnectionPool(),
74 pConfig,
75 modeFlags));
76 connect(m_workers.back().thread(), &AnalyzerThread::progress,
77 this, &TrackAnalysisScheduler::onWorkerThreadProgress);
78 }
79 // 2nd pass: Start worker threads in a suspended state
80 for (const auto& worker: m_workers) {
81 worker.thread()->suspend();
82 worker.thread()->start(kWorkerThreadPriority);
83 }
84 }
85
~TrackAnalysisScheduler()86 TrackAnalysisScheduler::~TrackAnalysisScheduler() {
87 kLogger.debug() << "Destroying";
88 }
89
emitProgressOrFinished()90 void TrackAnalysisScheduler::emitProgressOrFinished() {
91 // The finished() signal is emitted regardless of when the last
92 // signal has been emitted
93 if (allTracksFinished()) {
94 m_currentTrackProgress = kAnalyzerProgressUnknown;
95 m_currentTrackNumber = 0;
96 m_dequeuedTracksCount = 0;
97 emit finished();
98 return;
99 }
100
101 const auto now = Clock::now();
102 if (now < (m_lastProgressEmittedAt + kProgressInhibitDuration)) {
103 // Inhibit signal
104 return;
105 }
106 m_lastProgressEmittedAt = now;
107
108 DEBUG_ASSERT(m_pendingTrackIds.size() <=
109 static_cast<size_t>(m_dequeuedTracksCount));
110 const int finishedTracksCount =
111 m_dequeuedTracksCount - static_cast<int>(m_pendingTrackIds.size());
112
113 AnalyzerProgress workerProgressSum = 0;
114 int workerProgressCount = 0;
115 for (const auto& worker: m_workers) {
116 const AnalyzerProgress workerProgress = worker.analyzerProgress();
117 if (workerProgress >= kAnalyzerProgressNone) {
118 workerProgressSum += workerProgress;
119 ++workerProgressCount;
120 }
121 }
122 // The following algorithm/heuristic is used for calculating the
123 // amortized analysis progress (current track number + current
124 // track progress) across all worker threads. It results in a
125 // simple and almost linear progress display when multiple threads
126 // are running in parallel. It also covers the expected behavior
127 // for the single-threaded case. The receiver of progress updates
128 // should not need to know how many threads are actually processing
129 // tracks concurrently behind the scenes.
130 if (workerProgressCount > 0) {
131 DEBUG_ASSERT(kAnalyzerProgressNone == 0);
132 DEBUG_ASSERT(kAnalyzerProgressDone == 1);
133 const int inProgressCount =
134 math_max(1, int(std::ceil(workerProgressSum)));
135 const AnalyzerProgress currentTrackProgress =
136 workerProgressSum - std::floor(workerProgressSum);
137 // The calculation of inProgressCount is only an approximation.
138 // In some situations the calculated virtual current track number
139 // = finishedTracksCount + inProgressCount exceeds the upper
140 // bound m_dequeuedTracksCount. Using the minimum of both values
141 // is an appropriate choice for reporting continuous progress.
142 const int currentTrackNumber =
143 math_min(finishedTracksCount + inProgressCount, m_dequeuedTracksCount);
144 // The combination of the values current count (primary) and current
145 // progress (secondary) should never decrease to avoid confusion
146 if (m_currentTrackNumber < currentTrackNumber) {
147 m_currentTrackNumber = currentTrackNumber;
148 // Unconditional progress update
149 m_currentTrackProgress = currentTrackProgress;
150 } else if (m_currentTrackNumber == currentTrackNumber) {
151 // Conditional progress update if current count didn't change
152 if (m_currentTrackProgress >= kAnalyzerProgressNone) {
153 // Current progress should not decrease while the count is constant
154 m_currentTrackProgress = math_max(m_currentTrackProgress, currentTrackProgress);
155 } else {
156 // Initialize current progress
157 m_currentTrackProgress = currentTrackProgress;
158 }
159 }
160 } else {
161 if (m_currentTrackNumber < finishedTracksCount) {
162 m_currentTrackNumber = finishedTracksCount;
163 }
164 }
165 const int totalTracksCount =
166 m_dequeuedTracksCount + static_cast<int>(m_queuedTrackIds.size());
167 DEBUG_ASSERT(m_currentTrackNumber <= m_dequeuedTracksCount);
168 DEBUG_ASSERT(m_dequeuedTracksCount <= totalTracksCount);
169 emit progress(
170 m_currentTrackProgress,
171 m_currentTrackNumber,
172 totalTracksCount);
173 }
174
onWorkerThreadProgress(int threadId,AnalyzerThreadState threadState,TrackId trackId,AnalyzerProgress analyzerProgress)175 void TrackAnalysisScheduler::onWorkerThreadProgress(
176 int threadId,
177 AnalyzerThreadState threadState,
178 TrackId trackId,
179 AnalyzerProgress analyzerProgress) {
180 if (kLogger.traceEnabled()) {
181 kLogger.trace() << "onWorkerThreadProgress"
182 << threadId
183 << int(threadState)
184 << trackId
185 << analyzerProgress;
186 }
187 auto& worker = m_workers.at(threadId);
188 switch (threadState) {
189 case AnalyzerThreadState::Void:
190 DEBUG_ASSERT(!trackId.isValid());
191 DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
192 break;
193 case AnalyzerThreadState::Idle:
194 DEBUG_ASSERT(!trackId.isValid());
195 DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
196 worker.onAnalyzerProgress(analyzerProgress);
197 submitNextTrack(&worker);
198 break;
199 case AnalyzerThreadState::Busy:
200 DEBUG_ASSERT(trackId.isValid());
201 // Ignore delayed signals for tracks that are no longer pending
202 if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
203 DEBUG_ASSERT(analyzerProgress != kAnalyzerProgressUnknown);
204 DEBUG_ASSERT(analyzerProgress < kAnalyzerProgressDone);
205 worker.onAnalyzerProgress(analyzerProgress);
206 emit trackProgress(trackId, analyzerProgress);
207 }
208 break;
209 case AnalyzerThreadState::Done:
210 DEBUG_ASSERT(trackId.isValid());
211 // Ignore delayed signals for tracks that are no longer pending
212 if (m_pendingTrackIds.find(trackId) != m_pendingTrackIds.end()) {
213 DEBUG_ASSERT((analyzerProgress == kAnalyzerProgressDone) // success
214 || (analyzerProgress == kAnalyzerProgressUnknown)); // failure
215 m_pendingTrackIds.erase(trackId);
216 worker.onAnalyzerProgress(analyzerProgress);
217 emit trackProgress(trackId, analyzerProgress);
218 }
219 break;
220 case AnalyzerThreadState::Exit:
221 DEBUG_ASSERT(!trackId.isValid());
222 DEBUG_ASSERT(analyzerProgress == kAnalyzerProgressUnknown);
223 worker.onThreadExit();
224 DEBUG_ASSERT(!worker);
225 break;
226 default:
227 DEBUG_ASSERT(!"Unhandled signal from worker thread");
228 }
229 emitProgressOrFinished();
230 }
231
scheduleTrackById(TrackId trackId)232 bool TrackAnalysisScheduler::scheduleTrackById(TrackId trackId) {
233 VERIFY_OR_DEBUG_ASSERT(trackId.isValid()) {
234 qWarning()
235 << "Cannot schedule track with invalid id"
236 << trackId;
237 return false;
238 }
239 m_queuedTrackIds.push_back(trackId);
240 // Don't wake up the suspended thread now to avoid race conditions
241 // if multiple threads are added in a row by calling this function
242 // multiple times. The caller is responsible to finish the scheduling
243 // of multiple tracks with resume().
244 return true;
245 }
246
scheduleTracksById(const QList<TrackId> & trackIds)247 int TrackAnalysisScheduler::scheduleTracksById(const QList<TrackId>& trackIds) {
248 int scheduledCount = 0;
249 for (auto trackId: trackIds) {
250 if (scheduleTrackById(std::move(trackId))) {
251 ++scheduledCount;
252 }
253 }
254 return scheduledCount;
255 }
256
suspend()257 void TrackAnalysisScheduler::suspend() {
258 kLogger.debug() << "Suspending";
259 for (auto& worker: m_workers) {
260 worker.suspendThread();
261 }
262 }
263
resume()264 void TrackAnalysisScheduler::resume() {
265 kLogger.debug() << "Resuming";
266 for (auto& worker: m_workers) {
267 worker.resumeThread();
268 }
269 }
270
submitNextTrack(Worker * worker)271 bool TrackAnalysisScheduler::submitNextTrack(Worker* worker) {
272 DEBUG_ASSERT(worker);
273 while (!m_queuedTrackIds.empty()) {
274 TrackId nextTrackId = m_queuedTrackIds.front();
275 DEBUG_ASSERT(nextTrackId.isValid());
276 if (nextTrackId.isValid()) {
277 TrackPointer nextTrack =
278 m_library->trackCollection().getTrackById(nextTrackId);
279 if (nextTrack) {
280 if (m_pendingTrackIds.insert(nextTrackId).second) {
281 if (worker->submitNextTrack(std::move(nextTrack))) {
282 m_queuedTrackIds.pop_front();
283 ++m_dequeuedTracksCount;
284 return true;
285 } else {
286 // The worker may already have been assigned new tasks
287 // in the mean time, nothing to worry about.
288 m_pendingTrackIds.erase(nextTrackId);
289 kLogger.debug()
290 << "Failed to submit next track - worker thread"
291 << worker->thread()->id()
292 << "is busy";
293 // Early exit to avoid popping the next track from the queue (see below)!
294 return false;
295 }
296 } else {
297 // This track is currently analyzed by one of the workers
298 kLogger.debug()
299 << "Skipping duplicate track id"
300 << nextTrackId;
301 }
302 } else {
303 kLogger.warning()
304 << "Failed to load track by id"
305 << nextTrackId;
306 }
307 } else {
308 kLogger.warning()
309 << "Invalid track id"
310 << nextTrackId;
311 }
312 // Skip this track
313 m_queuedTrackIds.pop_front();
314 ++m_dequeuedTracksCount;
315 }
316 return false;
317 }
318
stop()319 void TrackAnalysisScheduler::stop() {
320 kLogger.debug() << "Stopping";
321 for (auto& worker: m_workers) {
322 worker.stopThread();
323 }
324 // The worker threads are still running at this point
325 // and m_workers must not be modified!
326 m_queuedTrackIds.clear();
327 m_pendingTrackIds.clear();
328 DEBUG_ASSERT((allTracksFinished()));
329 }
330
stopAndCollectScheduledTrackIds()331 QList<TrackId> TrackAnalysisScheduler::stopAndCollectScheduledTrackIds() {
332 QList<TrackId> scheduledTrackIds;
333 scheduledTrackIds.reserve(static_cast<int>(m_queuedTrackIds.size() + m_pendingTrackIds.size()));
334 for (auto queuedTrackId: m_queuedTrackIds) {
335 scheduledTrackIds.append(std::move(queuedTrackId));
336 }
337 for (auto pendingTrackId: m_pendingTrackIds) {
338 scheduledTrackIds.append(std::move(pendingTrackId));
339 }
340 // Stopping the scheduler will clear all queued and pending tracks,
341 // so we need to do this after we have collected all scheduled tracks!
342 stop();
343 return scheduledTrackIds;
344 }
345