1 /*
2  * MixerWorkerThread.cpp - implementation of MixerWorkerThread
3  *
4  * Copyright (c) 2009-2014 Tobias Doerffel <tobydox/at/users.sourceforge.net>
5  *
6  * This file is part of LMMS - https://lmms.io
7  *
8  * This program is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This program is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License for more details.
17  *
18  * You should have received a copy of the GNU General Public
19  * License along with this program (see COPYING); if not, write to the
20  * Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor,
21  * Boston, MA 02110-1301 USA.
22  *
23  */
24 
25 #include "MixerWorkerThread.h"
26 
27 #include "denormals.h"
28 #include <QDebug>
29 #include <QMutex>
30 #include <QWaitCondition>
31 #include "ThreadableJob.h"
32 #include "Mixer.h"
33 
34 MixerWorkerThread::JobQueue MixerWorkerThread::globalJobQueue;
35 QWaitCondition * MixerWorkerThread::queueReadyWaitCond = NULL;
36 QList<MixerWorkerThread *> MixerWorkerThread::workerThreads;
37 
38 
39 
40 // implementation of internal JobQueue
reset(OperationMode _opMode)41 void MixerWorkerThread::JobQueue::reset( OperationMode _opMode )
42 {
43 	m_writeIndex = 0;
44 	m_itemsDone = 0;
45 	m_opMode = _opMode;
46 }
47 
48 
49 
50 
addJob(ThreadableJob * _job)51 void MixerWorkerThread::JobQueue::addJob( ThreadableJob * _job )
52 {
53 	if( _job->requiresProcessing() )
54 	{
55 		// update job state
56 		_job->queue();
57 		// actually queue the job via atomic operations
58 		auto index = m_writeIndex.fetchAndAddOrdered(1);
59 		if (index < JOB_QUEUE_SIZE) {
60 			m_items[index] = _job;
61 		} else {
62 			qWarning() << "Job queue is full!";
63 			m_itemsDone.fetchAndAddOrdered(1);
64 		}
65 	}
66 }
67 
68 
69 
run()70 void MixerWorkerThread::JobQueue::run()
71 {
72 	bool processedJob = true;
73 	while( processedJob && (int) m_itemsDone < (int) m_writeIndex )
74 	{
75 		processedJob = false;
76 		for( int i = 0; i < m_writeIndex && i < JOB_QUEUE_SIZE; ++i )
77 		{
78 			ThreadableJob * job = m_items[i].fetchAndStoreOrdered( NULL );
79 			if( job )
80 			{
81 				job->process();
82 				processedJob = true;
83 				m_itemsDone.fetchAndAddOrdered( 1 );
84 			}
85 		}
86 		// always exit loop if we're not in dynamic mode
87 		processedJob = processedJob && ( m_opMode == Dynamic );
88 	}
89 }
90 
91 
92 
93 
wait()94 void MixerWorkerThread::JobQueue::wait()
95 {
96 	while( (int) m_itemsDone < (int) m_writeIndex )
97 	{
98 #if defined(LMMS_HOST_X86) || defined(LMMS_HOST_X86_64)
99 		asm( "pause" );
100 #endif
101 	}
102 }
103 
104 
105 
106 
107 
108 // implementation of worker threads
109 
MixerWorkerThread(Mixer * mixer)110 MixerWorkerThread::MixerWorkerThread( Mixer* mixer ) :
111 	QThread( mixer ),
112 	m_quit( false )
113 {
114 	// initialize global static data
115 	if( queueReadyWaitCond == NULL )
116 	{
117 		queueReadyWaitCond = new QWaitCondition;
118 	}
119 
120 	// keep track of all instantiated worker threads - this is used for
121 	// processing the last worker thread "inline", see comments in
122 	// MixerWorkerThread::startAndWaitForJobs() for details
123 	workerThreads << this;
124 
125 	resetJobQueue();
126 }
127 
128 
129 
130 
~MixerWorkerThread()131 MixerWorkerThread::~MixerWorkerThread()
132 {
133 	workerThreads.removeAll( this );
134 }
135 
136 
137 
138 
quit()139 void MixerWorkerThread::quit()
140 {
141 	m_quit = true;
142 	resetJobQueue();
143 }
144 
145 
146 
147 
startAndWaitForJobs()148 void MixerWorkerThread::startAndWaitForJobs()
149 {
150 	queueReadyWaitCond->wakeAll();
151 	// The last worker-thread is never started. Instead it's processed "inline"
152 	// i.e. within the global Mixer thread. This way we can reduce latencies
153 	// that otherwise would be caused by synchronizing with another thread.
154 	globalJobQueue.run();
155 	globalJobQueue.wait();
156 }
157 
158 
159 
160 
run()161 void MixerWorkerThread::run()
162 {
163 	MemoryManager::ThreadGuard mmThreadGuard; Q_UNUSED(mmThreadGuard);
164 	disable_denormals();
165 
166 	QMutex m;
167 	while( m_quit == false )
168 	{
169 		m.lock();
170 		queueReadyWaitCond->wait( &m );
171 		globalJobQueue.run();
172 		m.unlock();
173 	}
174 }
175 
176 
177