1 /*
2 * Copyright (C) 2005-2018 Team Kodi
3 * This file is part of Kodi - https://kodi.tv
4 *
5 * SPDX-License-Identifier: GPL-2.0-or-later
6 * See LICENSES/README.md for more information.
7 */
8
9 #include "JobManager.h"
10
11 #include "threads/SingleLock.h"
12 #include "utils/XTimeUtils.h"
13 #include "utils/log.h"
14
15 #include <algorithm>
16 #include <functional>
17 #include <stdexcept>
18
ShouldCancel(unsigned int progress,unsigned int total) const19 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
20 {
21 if (m_callback)
22 return m_callback->OnJobProgress(progress, total, this);
23 return false;
24 }
25
CJobWorker(CJobManager * manager)26 CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
27 {
28 m_jobManager = manager;
29 Create(true); // start work immediately, and kill ourselves when we're done
30 }
31
~CJobWorker()32 CJobWorker::~CJobWorker()
33 {
34 // while we should already be removed from the job manager, if an exception
35 // occurs during processing that we haven't caught, we may skip over that step.
36 // Thus, before we go out of scope, ensure the job manager knows we're gone.
37 m_jobManager->RemoveWorker(this);
38 if(!IsAutoDelete())
39 StopThread();
40 }
41
Process()42 void CJobWorker::Process()
43 {
44 SetPriority( GetMinPriority() );
45 while (true)
46 {
47 // request an item from our manager (this call is blocking)
48 CJob *job = m_jobManager->GetNextJob(this);
49 if (!job)
50 break;
51
52 bool success = false;
53 try
54 {
55 success = job->DoWork();
56 }
57 catch (...)
58 {
59 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, job->GetType());
60 }
61 m_jobManager->OnJobComplete(success, job);
62 }
63 }
64
CancelJob()65 void CJobQueue::CJobPointer::CancelJob()
66 {
67 CJobManager::GetInstance().CancelJob(m_id);
68 m_id = 0;
69 }
70
CJobQueue(bool lifo,unsigned int jobsAtOnce,CJob::PRIORITY priority)71 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
72 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
73 {
74 }
75
~CJobQueue()76 CJobQueue::~CJobQueue()
77 {
78 CancelJobs();
79 }
80
OnJobComplete(unsigned int jobID,bool success,CJob * job)81 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
82 {
83 CSingleLock lock(m_section);
84 // check if this job is in our processing list
85 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
86 if (i != m_processing.end())
87 m_processing.erase(i);
88 // request a new job be queued
89 QueueNextJob();
90 }
91
CancelJob(const CJob * job)92 void CJobQueue::CancelJob(const CJob *job)
93 {
94 CSingleLock lock(m_section);
95 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
96 if (i != m_processing.end())
97 {
98 i->CancelJob();
99 m_processing.erase(i);
100 return;
101 }
102 Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
103 if (j != m_jobQueue.end())
104 {
105 j->FreeJob();
106 m_jobQueue.erase(j);
107 }
108 }
109
AddJob(CJob * job)110 bool CJobQueue::AddJob(CJob *job)
111 {
112 CSingleLock lock(m_section);
113 // check if we have this job already. If so, we're done.
114 if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
115 find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
116 {
117 delete job;
118 return false;
119 }
120
121 if (m_lifo)
122 m_jobQueue.push_back(CJobPointer(job));
123 else
124 m_jobQueue.push_front(CJobPointer(job));
125 QueueNextJob();
126
127 return true;
128 }
129
QueueNextJob()130 void CJobQueue::QueueNextJob()
131 {
132 CSingleLock lock(m_section);
133 if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
134 {
135 CJobPointer &job = m_jobQueue.back();
136 job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
137 m_processing.push_back(job);
138 m_jobQueue.pop_back();
139 }
140 }
141
CancelJobs()142 void CJobQueue::CancelJobs()
143 {
144 CSingleLock lock(m_section);
145 for_each(m_processing.begin(), m_processing.end(), [](CJobPointer& jp) { jp.CancelJob(); });
146 for_each(m_jobQueue.begin(), m_jobQueue.end(), [](CJobPointer& jp) { jp.FreeJob(); });
147 m_jobQueue.clear();
148 m_processing.clear();
149 }
150
IsProcessing() const151 bool CJobQueue::IsProcessing() const
152 {
153 return CJobManager::GetInstance().m_running && (!m_processing.empty() || !m_jobQueue.empty());
154 }
155
QueueEmpty() const156 bool CJobQueue::QueueEmpty() const
157 {
158 CSingleLock lock(m_section);
159 return m_jobQueue.empty();
160 }
161
GetInstance()162 CJobManager &CJobManager::GetInstance()
163 {
164 static CJobManager sJobManager;
165 return sJobManager;
166 }
167
CJobManager()168 CJobManager::CJobManager()
169 {
170 m_jobCounter = 0;
171 m_running = true;
172 m_pauseJobs = false;
173 }
174
Restart()175 void CJobManager::Restart()
176 {
177 CSingleLock lock(m_section);
178
179 if (m_running)
180 throw std::logic_error("CJobManager already running");
181 m_running = true;
182 }
183
CancelJobs()184 void CJobManager::CancelJobs()
185 {
186 CSingleLock lock(m_section);
187 m_running = false;
188
189 // clear any pending jobs
190 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
191 {
192 for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), [](CWorkItem& wi) { wi.FreeJob(); });
193 m_jobQueue[priority].clear();
194 }
195
196 // cancel any callbacks on jobs still processing
197 for_each(m_processing.begin(), m_processing.end(), [](CWorkItem& wi) { wi.Cancel(); });
198
199 // tell our workers to finish
200 while (m_workers.size())
201 {
202 lock.Leave();
203 m_jobEvent.Set();
204 std::this_thread::yield(); // yield after setting the event to give the workers some time to die
205 lock.Enter();
206 }
207 }
208
AddJob(CJob * job,IJobCallback * callback,CJob::PRIORITY priority)209 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
210 {
211 CSingleLock lock(m_section);
212
213 if (!m_running)
214 return 0;
215
216 // increment the job counter, ensuring 0 (invalid job) is never hit
217 m_jobCounter++;
218 if (m_jobCounter == 0)
219 m_jobCounter++;
220
221 // create a work item for this job
222 CWorkItem work(job, m_jobCounter, priority, callback);
223 m_jobQueue[priority].push_back(work);
224
225 StartWorkers(priority);
226 return work.m_id;
227 }
228
CancelJob(unsigned int jobID)229 void CJobManager::CancelJob(unsigned int jobID)
230 {
231 CSingleLock lock(m_section);
232
233 // check whether we have this job in the queue
234 for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
235 {
236 JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
237 if (i != m_jobQueue[priority].end())
238 {
239 delete i->m_job;
240 m_jobQueue[priority].erase(i);
241 return;
242 }
243 }
244 // or if we're processing it
245 Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
246 if (it != m_processing.end())
247 it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
248 }
249
StartWorkers(CJob::PRIORITY priority)250 void CJobManager::StartWorkers(CJob::PRIORITY priority)
251 {
252 CSingleLock lock(m_section);
253
254 // check how many free threads we have
255 if (m_processing.size() >= GetMaxWorkers(priority))
256 return;
257
258 // do we have any sleeping threads?
259 if (m_processing.size() < m_workers.size())
260 {
261 m_jobEvent.Set();
262 return;
263 }
264
265 // everyone is busy - we need more workers
266 m_workers.push_back(new CJobWorker(this));
267 }
268
PopJob()269 CJob *CJobManager::PopJob()
270 {
271 CSingleLock lock(m_section);
272 for (int priority = CJob::PRIORITY_DEDICATED; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
273 {
274 // Check whether we're pausing pausable jobs
275 if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
276 continue;
277
278 if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
279 {
280 // pop the job off the queue
281 CWorkItem job = m_jobQueue[priority].front();
282 m_jobQueue[priority].pop_front();
283
284 // add to the processing vector
285 m_processing.push_back(job);
286 job.m_job->m_callback = this;
287 return job.m_job;
288 }
289 }
290 return NULL;
291 }
292
PauseJobs()293 void CJobManager::PauseJobs()
294 {
295 CSingleLock lock(m_section);
296 m_pauseJobs = true;
297 }
298
UnPauseJobs()299 void CJobManager::UnPauseJobs()
300 {
301 CSingleLock lock(m_section);
302 m_pauseJobs = false;
303 }
304
IsProcessing(const CJob::PRIORITY & priority) const305 bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
306 {
307 CSingleLock lock(m_section);
308
309 if (m_pauseJobs)
310 return false;
311
312 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
313 {
314 if (priority == it->m_priority)
315 return true;
316 }
317 return false;
318 }
319
IsProcessing(const std::string & type) const320 int CJobManager::IsProcessing(const std::string &type) const
321 {
322 int jobsMatched = 0;
323 CSingleLock lock(m_section);
324
325 if (m_pauseJobs)
326 return 0;
327
328 for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
329 {
330 if (type == std::string(it->m_job->GetType()))
331 jobsMatched++;
332 }
333 return jobsMatched;
334 }
335
GetNextJob(const CJobWorker * worker)336 CJob *CJobManager::GetNextJob(const CJobWorker *worker)
337 {
338 CSingleLock lock(m_section);
339 while (m_running)
340 {
341 // grab a job off the queue if we have one
342 CJob *job = PopJob();
343 if (job)
344 return job;
345 // no jobs are left - sleep for 30 seconds to allow new jobs to come in
346 lock.Leave();
347 bool newJob = m_jobEvent.WaitMSec(30000);
348 lock.Enter();
349 if (!newJob)
350 break;
351 }
352 // ensure no jobs have come in during the period after
353 // timeout and before we held the lock
354 CJob *job = PopJob();
355 if (job)
356 return job;
357 // have no jobs
358 RemoveWorker(worker);
359 return NULL;
360 }
361
OnJobProgress(unsigned int progress,unsigned int total,const CJob * job) const362 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
363 {
364 CSingleLock lock(m_section);
365 // find the job in the processing queue, and check whether it's cancelled (no callback)
366 Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
367 if (i != m_processing.end())
368 {
369 CWorkItem item(*i);
370 lock.Leave(); // leave section prior to call
371 if (item.m_callback)
372 {
373 item.m_callback->OnJobProgress(item.m_id, progress, total, job);
374 return false;
375 }
376 }
377 return true; // couldn't find the job, or it's been cancelled
378 }
379
OnJobComplete(bool success,CJob * job)380 void CJobManager::OnJobComplete(bool success, CJob *job)
381 {
382 CSingleLock lock(m_section);
383 // remove the job from the processing queue
384 Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
385 if (i != m_processing.end())
386 {
387 // tell any listeners we're done with the job, then delete it
388 CWorkItem item(*i);
389 lock.Leave();
390 try
391 {
392 if (item.m_callback)
393 item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
394 }
395 catch (...)
396 {
397 CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, item.m_job->GetType());
398 }
399 lock.Enter();
400 Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
401 if (j != m_processing.end())
402 m_processing.erase(j);
403 lock.Leave();
404 item.FreeJob();
405 }
406 }
407
RemoveWorker(const CJobWorker * worker)408 void CJobManager::RemoveWorker(const CJobWorker *worker)
409 {
410 CSingleLock lock(m_section);
411 // remove our worker
412 Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
413 if (i != m_workers.end())
414 m_workers.erase(i); // workers auto-delete
415 }
416
GetMaxWorkers(CJob::PRIORITY priority)417 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority)
418 {
419 static const unsigned int max_workers = 5;
420 if (priority == CJob::PRIORITY_DEDICATED)
421 return 10000; // A large number..
422 return max_workers - (CJob::PRIORITY_HIGH - priority);
423 }
424