1 /* === This file is part of Calamares - <https://calamares.io> ===
2  *
3  *   SPDX-FileCopyrightText: 2014-2015 Teo Mrnjavac <teo@kde.org>
4  *   SPDX-FileCopyrightText: 2018 Adriaan de Groot <groot@kde.org>
5  *   SPDX-License-Identifier: GPL-3.0-or-later
6  *
7  *   Calamares is Free Software: see the License-Identifier above.
8  *
9  */
10 
11 #include "JobQueue.h"
12 
13 #include "CalamaresConfig.h"
14 #include "GlobalStorage.h"
15 #include "Job.h"
16 #include "utils/Logger.h"
17 
18 #include <QMutex>
19 #include <QMutexLocker>
20 #include <QThread>
21 
22 #include <memory>
23 
24 namespace Calamares
25 {
26 
27 struct WeightedJob
28 {
29     /** @brief Cumulative weight **before** this job starts
30      *
31      * This is calculated as jobs come in.
32      */
33     qreal cumulative = 0.0;
34     /** @brief Weight of the job within the module's jobs
35      *
36      * When a list of jobs is added from a particular module,
37      * the jobs are weighted relative to that module's overall weight
38      * **and** the other jobs in the list, so that each job
39      * gets its share:
40      *      ( job-weight / total-job-weight ) * module-weight
41      */
42     qreal weight = 0.0;
43 
44     job_ptr job;
45 };
46 using WeightedJobList = QList< WeightedJob >;
47 
48 class JobThread : public QThread
49 {
50     Q_OBJECT
51 public:
JobThread(JobQueue * queue)52     JobThread( JobQueue* queue )
53         : QThread( queue )
54         , m_queue( queue )
55         , m_jobIndex( 0 )
56     {
57     }
58 
59     ~JobThread() override;
60 
finalize()61     void finalize()
62     {
63         Q_ASSERT( m_runningJobs->isEmpty() );
64         QMutexLocker qlock( &m_enqueMutex );
65         QMutexLocker rlock( &m_runMutex );
66         std::swap( m_runningJobs, m_queuedJobs );
67         m_overallQueueWeight
68             = m_runningJobs->isEmpty() ? 0.0 : ( m_runningJobs->last().cumulative + m_runningJobs->last().weight );
69         if ( m_overallQueueWeight < 1 )
70         {
71             m_overallQueueWeight = 1.0;
72         }
73 
74         cDebug() << "There are" << m_runningJobs->count() << "jobs, total weight" << m_overallQueueWeight;
75         int c = 0;
76         for ( const auto& j : *m_runningJobs )
77         {
78             cDebug() << Logger::SubEntry << "Job" << ( c + 1 ) << j.job->prettyName() << "+wt" << j.weight << "tot.wt"
79                      << ( j.cumulative + j.weight );
80             c++;
81         }
82     }
83 
enqueue(int moduleWeight,const JobList & jobs)84     void enqueue( int moduleWeight, const JobList& jobs )
85     {
86         QMutexLocker qlock( &m_enqueMutex );
87 
88         qreal cumulative
89             = m_queuedJobs->isEmpty() ? 0.0 : ( m_queuedJobs->last().cumulative + m_queuedJobs->last().weight );
90 
91         qreal totalJobWeight
92             = std::accumulate( jobs.cbegin(), jobs.cend(), qreal( 0.0 ), []( qreal total, const job_ptr& j ) {
93                   return total + j->getJobWeight();
94               } );
95         if ( totalJobWeight < 1 )
96         {
97             totalJobWeight = 1.0;
98         }
99 
100         for ( const auto& j : jobs )
101         {
102             qreal jobContribution = ( j->getJobWeight() / totalJobWeight ) * moduleWeight;
103             m_queuedJobs->append( WeightedJob { cumulative, jobContribution, j } );
104             cumulative += jobContribution;
105         }
106     }
107 
run()108     void run() override
109     {
110         QMutexLocker rlock( &m_runMutex );
111         bool failureEncountered = false;
112         QString message;  ///< Filled in with errors
113         QString details;
114 
115         Logger::Once o;
116         m_jobIndex = 0;
117         for ( const auto& jobitem : *m_runningJobs )
118         {
119             if ( failureEncountered && !jobitem.job->isEmergency() )
120             {
121                 cDebug() << o << "Skipping non-emergency job" << jobitem.job->prettyName();
122             }
123             else
124             {
125                 cDebug() << o << "Starting" << ( failureEncountered ? "EMERGENCY JOB" : "job" ) << jobitem.job->prettyName()
126                          << '(' << ( m_jobIndex + 1 ) << '/' << m_runningJobs->count() << ')';
127                 o.refresh();  // So next time it shows the function header again
128                 emitProgress( 0.0 );  // 0% for *this job*
129                 connect( jobitem.job.data(), &Job::progress, this, &JobThread::emitProgress );
130                 auto result = jobitem.job->exec();
131                 if ( !failureEncountered && !result )
132                 {
133                     // so this is the first failure
134                     failureEncountered = true;
135                     message = result.message();
136                     details = result.details();
137                 }
138                 QThread::msleep( 16 );  // Very brief rest before reporting the job as complete
139                 emitProgress( 1.0 );  // 100% for *this job*
140             }
141             m_jobIndex++;
142         }
143         if ( failureEncountered )
144         {
145             QMetaObject::invokeMethod(
146                 m_queue, "failed", Qt::QueuedConnection, Q_ARG( QString, message ), Q_ARG( QString, details ) );
147         }
148         else
149         {
150             emitProgress( 1.0 );
151         }
152         m_runningJobs->clear();
153         QMetaObject::invokeMethod( m_queue, "finish", Qt::QueuedConnection );
154     }
155 
156     /** @brief The names of the queued (not running!) jobs.
157      */
queuedJobs() const158     QStringList queuedJobs() const
159     {
160         QMutexLocker qlock( &m_enqueMutex );
161         QStringList l;
162         l.reserve( m_queuedJobs->count() );
163         for ( const auto& j : *m_queuedJobs )
164         {
165             l << j.job->prettyName();
166         }
167         return l;
168     }
169 
170 private:
171     /* This is called **only** from run(), while m_runMutex is
172      * already locked, so we can use the m_runningJobs member safely.
173      */
emitProgress(qreal percentage) const174     void emitProgress( qreal percentage ) const
175     {
176         percentage = qBound( 0.0, percentage, 1.0 );
177 
178         QString message;
179         qreal progress = 0.0;
180         if ( m_jobIndex < m_runningJobs->count() )
181         {
182             const auto& jobitem = m_runningJobs->at( m_jobIndex );
183             progress = ( jobitem.cumulative + jobitem.weight * percentage ) / m_overallQueueWeight;
184             message = jobitem.job->prettyStatusMessage();
185             // In progress reports at the start of a job (e.g. when the queue
186             // starts the job, or if the job itself reports 0.0) be more
187             // accepting in what gets reported: jobs with no status fall
188             // back to description and name, whichever is non-empty.
189             if ( percentage == 0.0 && message.isEmpty() )
190             {
191                 message = jobitem.job->prettyDescription();
192                 if ( message.isEmpty() )
193                 {
194                     message = jobitem.job->prettyName();
195                 }
196             }
197         }
198         else
199         {
200             progress = 1.0;
201             message = tr( "Done" );
202         }
203         QMetaObject::invokeMethod(
204             m_queue, "progress", Qt::QueuedConnection, Q_ARG( qreal, progress ), Q_ARG( QString, message ) );
205     }
206 
207     mutable QMutex m_runMutex;
208     mutable QMutex m_enqueMutex;
209 
210     std::unique_ptr< WeightedJobList > m_runningJobs = std::make_unique< WeightedJobList >();
211     std::unique_ptr< WeightedJobList > m_queuedJobs = std::make_unique< WeightedJobList >();
212 
213     JobQueue* m_queue;
214     int m_jobIndex = 0;  ///< Index into m_runningJobs
215     qreal m_overallQueueWeight = 0.0;  ///< cumulation when **all** the jobs are done
216 };
217 
~JobThread()218 JobThread::~JobThread() {}
219 
220 
221 JobQueue* JobQueue::s_instance = nullptr;
222 
223 JobQueue*
instance()224 JobQueue::instance()
225 {
226     if ( !s_instance )
227     {
228         cWarning() << "Getting nullptr JobQueue instance.";
229     }
230     return s_instance;
231 }
232 
233 
JobQueue(QObject * parent)234 JobQueue::JobQueue( QObject* parent )
235     : QObject( parent )
236     , m_thread( new JobThread( this ) )
237     , m_storage( new GlobalStorage( this ) )
238 {
239     Q_ASSERT( !s_instance );
240     s_instance = this;
241 }
242 
243 
~JobQueue()244 JobQueue::~JobQueue()
245 {
246     if ( m_thread->isRunning() )
247     {
248         m_thread->terminate();
249         if ( !m_thread->wait( 300 ) )
250         {
251             cError() << "Could not terminate job thread (expect a crash now).";
252         }
253         delete m_thread;
254     }
255 
256     delete m_storage;
257     s_instance = nullptr;
258 }
259 
260 
261 void
start()262 JobQueue::start()
263 {
264     Q_ASSERT( !m_thread->isRunning() );
265     m_thread->finalize();
266     m_finished = false;
267     m_thread->start();
268 }
269 
270 
271 void
enqueue(int moduleWeight,const JobList & jobs)272 JobQueue::enqueue( int moduleWeight, const JobList& jobs )
273 {
274     Q_ASSERT( !m_thread->isRunning() );
275     m_thread->enqueue( moduleWeight, jobs );
276     emit queueChanged( m_thread->queuedJobs() );
277 }
278 
279 void
finish()280 JobQueue::finish()
281 {
282     m_finished = true;
283     emit finished();
284     emit queueChanged( m_thread->queuedJobs() );
285 }
286 
287 GlobalStorage*
globalStorage() const288 JobQueue::globalStorage() const
289 {
290     return m_storage;
291 }
292 
293 }  // namespace Calamares
294 
295 #include "utils/moc-warnings.h"
296 
297 #include "JobQueue.moc"
298