1 /* === This file is part of Calamares - <https://calamares.io> ===
2 *
3 * SPDX-FileCopyrightText: 2014-2015 Teo Mrnjavac <teo@kde.org>
4 * SPDX-FileCopyrightText: 2018 Adriaan de Groot <groot@kde.org>
5 * SPDX-License-Identifier: GPL-3.0-or-later
6 *
7 * Calamares is Free Software: see the License-Identifier above.
8 *
9 */
10
11 #include "JobQueue.h"
12
13 #include "CalamaresConfig.h"
14 #include "GlobalStorage.h"
15 #include "Job.h"
16 #include "utils/Logger.h"
17
18 #include <QMutex>
19 #include <QMutexLocker>
20 #include <QThread>
21
22 #include <memory>
23
24 namespace Calamares
25 {
26
27 struct WeightedJob
28 {
29 /** @brief Cumulative weight **before** this job starts
30 *
31 * This is calculated as jobs come in.
32 */
33 qreal cumulative = 0.0;
34 /** @brief Weight of the job within the module's jobs
35 *
36 * When a list of jobs is added from a particular module,
37 * the jobs are weighted relative to that module's overall weight
38 * **and** the other jobs in the list, so that each job
39 * gets its share:
40 * ( job-weight / total-job-weight ) * module-weight
41 */
42 qreal weight = 0.0;
43
44 job_ptr job;
45 };
46 using WeightedJobList = QList< WeightedJob >;
47
48 class JobThread : public QThread
49 {
50 Q_OBJECT
51 public:
JobThread(JobQueue * queue)52 JobThread( JobQueue* queue )
53 : QThread( queue )
54 , m_queue( queue )
55 , m_jobIndex( 0 )
56 {
57 }
58
59 ~JobThread() override;
60
finalize()61 void finalize()
62 {
63 Q_ASSERT( m_runningJobs->isEmpty() );
64 QMutexLocker qlock( &m_enqueMutex );
65 QMutexLocker rlock( &m_runMutex );
66 std::swap( m_runningJobs, m_queuedJobs );
67 m_overallQueueWeight
68 = m_runningJobs->isEmpty() ? 0.0 : ( m_runningJobs->last().cumulative + m_runningJobs->last().weight );
69 if ( m_overallQueueWeight < 1 )
70 {
71 m_overallQueueWeight = 1.0;
72 }
73
74 cDebug() << "There are" << m_runningJobs->count() << "jobs, total weight" << m_overallQueueWeight;
75 int c = 0;
76 for ( const auto& j : *m_runningJobs )
77 {
78 cDebug() << Logger::SubEntry << "Job" << ( c + 1 ) << j.job->prettyName() << "+wt" << j.weight << "tot.wt"
79 << ( j.cumulative + j.weight );
80 c++;
81 }
82 }
83
enqueue(int moduleWeight,const JobList & jobs)84 void enqueue( int moduleWeight, const JobList& jobs )
85 {
86 QMutexLocker qlock( &m_enqueMutex );
87
88 qreal cumulative
89 = m_queuedJobs->isEmpty() ? 0.0 : ( m_queuedJobs->last().cumulative + m_queuedJobs->last().weight );
90
91 qreal totalJobWeight
92 = std::accumulate( jobs.cbegin(), jobs.cend(), qreal( 0.0 ), []( qreal total, const job_ptr& j ) {
93 return total + j->getJobWeight();
94 } );
95 if ( totalJobWeight < 1 )
96 {
97 totalJobWeight = 1.0;
98 }
99
100 for ( const auto& j : jobs )
101 {
102 qreal jobContribution = ( j->getJobWeight() / totalJobWeight ) * moduleWeight;
103 m_queuedJobs->append( WeightedJob { cumulative, jobContribution, j } );
104 cumulative += jobContribution;
105 }
106 }
107
run()108 void run() override
109 {
110 QMutexLocker rlock( &m_runMutex );
111 bool failureEncountered = false;
112 QString message; ///< Filled in with errors
113 QString details;
114
115 Logger::Once o;
116 m_jobIndex = 0;
117 for ( const auto& jobitem : *m_runningJobs )
118 {
119 if ( failureEncountered && !jobitem.job->isEmergency() )
120 {
121 cDebug() << o << "Skipping non-emergency job" << jobitem.job->prettyName();
122 }
123 else
124 {
125 cDebug() << o << "Starting" << ( failureEncountered ? "EMERGENCY JOB" : "job" ) << jobitem.job->prettyName()
126 << '(' << ( m_jobIndex + 1 ) << '/' << m_runningJobs->count() << ')';
127 o.refresh(); // So next time it shows the function header again
128 emitProgress( 0.0 ); // 0% for *this job*
129 connect( jobitem.job.data(), &Job::progress, this, &JobThread::emitProgress );
130 auto result = jobitem.job->exec();
131 if ( !failureEncountered && !result )
132 {
133 // so this is the first failure
134 failureEncountered = true;
135 message = result.message();
136 details = result.details();
137 }
138 QThread::msleep( 16 ); // Very brief rest before reporting the job as complete
139 emitProgress( 1.0 ); // 100% for *this job*
140 }
141 m_jobIndex++;
142 }
143 if ( failureEncountered )
144 {
145 QMetaObject::invokeMethod(
146 m_queue, "failed", Qt::QueuedConnection, Q_ARG( QString, message ), Q_ARG( QString, details ) );
147 }
148 else
149 {
150 emitProgress( 1.0 );
151 }
152 m_runningJobs->clear();
153 QMetaObject::invokeMethod( m_queue, "finish", Qt::QueuedConnection );
154 }
155
156 /** @brief The names of the queued (not running!) jobs.
157 */
queuedJobs() const158 QStringList queuedJobs() const
159 {
160 QMutexLocker qlock( &m_enqueMutex );
161 QStringList l;
162 l.reserve( m_queuedJobs->count() );
163 for ( const auto& j : *m_queuedJobs )
164 {
165 l << j.job->prettyName();
166 }
167 return l;
168 }
169
170 private:
171 /* This is called **only** from run(), while m_runMutex is
172 * already locked, so we can use the m_runningJobs member safely.
173 */
emitProgress(qreal percentage) const174 void emitProgress( qreal percentage ) const
175 {
176 percentage = qBound( 0.0, percentage, 1.0 );
177
178 QString message;
179 qreal progress = 0.0;
180 if ( m_jobIndex < m_runningJobs->count() )
181 {
182 const auto& jobitem = m_runningJobs->at( m_jobIndex );
183 progress = ( jobitem.cumulative + jobitem.weight * percentage ) / m_overallQueueWeight;
184 message = jobitem.job->prettyStatusMessage();
185 // In progress reports at the start of a job (e.g. when the queue
186 // starts the job, or if the job itself reports 0.0) be more
187 // accepting in what gets reported: jobs with no status fall
188 // back to description and name, whichever is non-empty.
189 if ( percentage == 0.0 && message.isEmpty() )
190 {
191 message = jobitem.job->prettyDescription();
192 if ( message.isEmpty() )
193 {
194 message = jobitem.job->prettyName();
195 }
196 }
197 }
198 else
199 {
200 progress = 1.0;
201 message = tr( "Done" );
202 }
203 QMetaObject::invokeMethod(
204 m_queue, "progress", Qt::QueuedConnection, Q_ARG( qreal, progress ), Q_ARG( QString, message ) );
205 }
206
207 mutable QMutex m_runMutex;
208 mutable QMutex m_enqueMutex;
209
210 std::unique_ptr< WeightedJobList > m_runningJobs = std::make_unique< WeightedJobList >();
211 std::unique_ptr< WeightedJobList > m_queuedJobs = std::make_unique< WeightedJobList >();
212
213 JobQueue* m_queue;
214 int m_jobIndex = 0; ///< Index into m_runningJobs
215 qreal m_overallQueueWeight = 0.0; ///< cumulation when **all** the jobs are done
216 };
217
~JobThread()218 JobThread::~JobThread() {}
219
220
221 JobQueue* JobQueue::s_instance = nullptr;
222
223 JobQueue*
instance()224 JobQueue::instance()
225 {
226 if ( !s_instance )
227 {
228 cWarning() << "Getting nullptr JobQueue instance.";
229 }
230 return s_instance;
231 }
232
233
JobQueue(QObject * parent)234 JobQueue::JobQueue( QObject* parent )
235 : QObject( parent )
236 , m_thread( new JobThread( this ) )
237 , m_storage( new GlobalStorage( this ) )
238 {
239 Q_ASSERT( !s_instance );
240 s_instance = this;
241 }
242
243
~JobQueue()244 JobQueue::~JobQueue()
245 {
246 if ( m_thread->isRunning() )
247 {
248 m_thread->terminate();
249 if ( !m_thread->wait( 300 ) )
250 {
251 cError() << "Could not terminate job thread (expect a crash now).";
252 }
253 delete m_thread;
254 }
255
256 delete m_storage;
257 s_instance = nullptr;
258 }
259
260
261 void
start()262 JobQueue::start()
263 {
264 Q_ASSERT( !m_thread->isRunning() );
265 m_thread->finalize();
266 m_finished = false;
267 m_thread->start();
268 }
269
270
271 void
enqueue(int moduleWeight,const JobList & jobs)272 JobQueue::enqueue( int moduleWeight, const JobList& jobs )
273 {
274 Q_ASSERT( !m_thread->isRunning() );
275 m_thread->enqueue( moduleWeight, jobs );
276 emit queueChanged( m_thread->queuedJobs() );
277 }
278
279 void
finish()280 JobQueue::finish()
281 {
282 m_finished = true;
283 emit finished();
284 emit queueChanged( m_thread->queuedJobs() );
285 }
286
287 GlobalStorage*
globalStorage() const288 JobQueue::globalStorage() const
289 {
290 return m_storage;
291 }
292
293 } // namespace Calamares
294
295 #include "utils/moc-warnings.h"
296
297 #include "JobQueue.moc"
298