1 /*
2  *  Copyright (C) 2005-2018 Team Kodi
3  *  This file is part of Kodi - https://kodi.tv
4  *
5  *  SPDX-License-Identifier: GPL-2.0-or-later
6  *  See LICENSES/README.md for more information.
7  */
8 
9 #include "JobManager.h"
10 
11 #include "threads/SingleLock.h"
12 #include "utils/XTimeUtils.h"
13 #include "utils/log.h"
14 
15 #include <algorithm>
16 #include <functional>
17 #include <stdexcept>
18 
ShouldCancel(unsigned int progress,unsigned int total) const19 bool CJob::ShouldCancel(unsigned int progress, unsigned int total) const
20 {
21   if (m_callback)
22     return m_callback->OnJobProgress(progress, total, this);
23   return false;
24 }
25 
CJobWorker(CJobManager * manager)26 CJobWorker::CJobWorker(CJobManager *manager) : CThread("JobWorker")
27 {
28   m_jobManager = manager;
29   Create(true); // start work immediately, and kill ourselves when we're done
30 }
31 
~CJobWorker()32 CJobWorker::~CJobWorker()
33 {
34   // while we should already be removed from the job manager, if an exception
35   // occurs during processing that we haven't caught, we may skip over that step.
36   // Thus, before we go out of scope, ensure the job manager knows we're gone.
37   m_jobManager->RemoveWorker(this);
38   if(!IsAutoDelete())
39     StopThread();
40 }
41 
Process()42 void CJobWorker::Process()
43 {
44   SetPriority( GetMinPriority() );
45   while (true)
46   {
47     // request an item from our manager (this call is blocking)
48     CJob *job = m_jobManager->GetNextJob(this);
49     if (!job)
50       break;
51 
52     bool success = false;
53     try
54     {
55       success = job->DoWork();
56     }
57     catch (...)
58     {
59       CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, job->GetType());
60     }
61     m_jobManager->OnJobComplete(success, job);
62   }
63 }
64 
CancelJob()65 void CJobQueue::CJobPointer::CancelJob()
66 {
67   CJobManager::GetInstance().CancelJob(m_id);
68   m_id = 0;
69 }
70 
CJobQueue(bool lifo,unsigned int jobsAtOnce,CJob::PRIORITY priority)71 CJobQueue::CJobQueue(bool lifo, unsigned int jobsAtOnce, CJob::PRIORITY priority)
72 : m_jobsAtOnce(jobsAtOnce), m_priority(priority), m_lifo(lifo)
73 {
74 }
75 
~CJobQueue()76 CJobQueue::~CJobQueue()
77 {
78   CancelJobs();
79 }
80 
OnJobComplete(unsigned int jobID,bool success,CJob * job)81 void CJobQueue::OnJobComplete(unsigned int jobID, bool success, CJob *job)
82 {
83   CSingleLock lock(m_section);
84   // check if this job is in our processing list
85   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
86   if (i != m_processing.end())
87     m_processing.erase(i);
88   // request a new job be queued
89   QueueNextJob();
90 }
91 
CancelJob(const CJob * job)92 void CJobQueue::CancelJob(const CJob *job)
93 {
94   CSingleLock lock(m_section);
95   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
96   if (i != m_processing.end())
97   {
98     i->CancelJob();
99     m_processing.erase(i);
100     return;
101   }
102   Queue::iterator j = find(m_jobQueue.begin(), m_jobQueue.end(), job);
103   if (j != m_jobQueue.end())
104   {
105     j->FreeJob();
106     m_jobQueue.erase(j);
107   }
108 }
109 
AddJob(CJob * job)110 bool CJobQueue::AddJob(CJob *job)
111 {
112   CSingleLock lock(m_section);
113   // check if we have this job already.  If so, we're done.
114   if (find(m_jobQueue.begin(), m_jobQueue.end(), job) != m_jobQueue.end() ||
115       find(m_processing.begin(), m_processing.end(), job) != m_processing.end())
116   {
117     delete job;
118     return false;
119   }
120 
121   if (m_lifo)
122     m_jobQueue.push_back(CJobPointer(job));
123   else
124     m_jobQueue.push_front(CJobPointer(job));
125   QueueNextJob();
126 
127   return true;
128 }
129 
QueueNextJob()130 void CJobQueue::QueueNextJob()
131 {
132   CSingleLock lock(m_section);
133   if (m_jobQueue.size() && m_processing.size() < m_jobsAtOnce)
134   {
135     CJobPointer &job = m_jobQueue.back();
136     job.m_id = CJobManager::GetInstance().AddJob(job.m_job, this, m_priority);
137     m_processing.push_back(job);
138     m_jobQueue.pop_back();
139   }
140 }
141 
CancelJobs()142 void CJobQueue::CancelJobs()
143 {
144   CSingleLock lock(m_section);
145   for_each(m_processing.begin(), m_processing.end(), [](CJobPointer& jp) { jp.CancelJob(); });
146   for_each(m_jobQueue.begin(), m_jobQueue.end(), [](CJobPointer& jp) { jp.FreeJob(); });
147   m_jobQueue.clear();
148   m_processing.clear();
149 }
150 
IsProcessing() const151 bool CJobQueue::IsProcessing() const
152 {
153   return CJobManager::GetInstance().m_running && (!m_processing.empty() || !m_jobQueue.empty());
154 }
155 
QueueEmpty() const156 bool CJobQueue::QueueEmpty() const
157 {
158   CSingleLock lock(m_section);
159   return m_jobQueue.empty();
160 }
161 
GetInstance()162 CJobManager &CJobManager::GetInstance()
163 {
164   static CJobManager sJobManager;
165   return sJobManager;
166 }
167 
CJobManager()168 CJobManager::CJobManager()
169 {
170   m_jobCounter = 0;
171   m_running = true;
172   m_pauseJobs = false;
173 }
174 
Restart()175 void CJobManager::Restart()
176 {
177   CSingleLock lock(m_section);
178 
179   if (m_running)
180     throw std::logic_error("CJobManager already running");
181   m_running = true;
182 }
183 
CancelJobs()184 void CJobManager::CancelJobs()
185 {
186   CSingleLock lock(m_section);
187   m_running = false;
188 
189   // clear any pending jobs
190   for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
191   {
192     for_each(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), [](CWorkItem& wi) { wi.FreeJob(); });
193     m_jobQueue[priority].clear();
194   }
195 
196   // cancel any callbacks on jobs still processing
197   for_each(m_processing.begin(), m_processing.end(), [](CWorkItem& wi) { wi.Cancel(); });
198 
199   // tell our workers to finish
200   while (m_workers.size())
201   {
202     lock.Leave();
203     m_jobEvent.Set();
204     std::this_thread::yield(); // yield after setting the event to give the workers some time to die
205     lock.Enter();
206   }
207 }
208 
AddJob(CJob * job,IJobCallback * callback,CJob::PRIORITY priority)209 unsigned int CJobManager::AddJob(CJob *job, IJobCallback *callback, CJob::PRIORITY priority)
210 {
211   CSingleLock lock(m_section);
212 
213   if (!m_running)
214     return 0;
215 
216   // increment the job counter, ensuring 0 (invalid job) is never hit
217   m_jobCounter++;
218   if (m_jobCounter == 0)
219     m_jobCounter++;
220 
221   // create a work item for this job
222   CWorkItem work(job, m_jobCounter, priority, callback);
223   m_jobQueue[priority].push_back(work);
224 
225   StartWorkers(priority);
226   return work.m_id;
227 }
228 
CancelJob(unsigned int jobID)229 void CJobManager::CancelJob(unsigned int jobID)
230 {
231   CSingleLock lock(m_section);
232 
233   // check whether we have this job in the queue
234   for (unsigned int priority = CJob::PRIORITY_LOW_PAUSABLE; priority <= CJob::PRIORITY_DEDICATED; ++priority)
235   {
236     JobQueue::iterator i = find(m_jobQueue[priority].begin(), m_jobQueue[priority].end(), jobID);
237     if (i != m_jobQueue[priority].end())
238     {
239       delete i->m_job;
240       m_jobQueue[priority].erase(i);
241       return;
242     }
243   }
244   // or if we're processing it
245   Processing::iterator it = find(m_processing.begin(), m_processing.end(), jobID);
246   if (it != m_processing.end())
247     it->m_callback = NULL; // job is in progress, so only thing to do is to remove callback
248 }
249 
StartWorkers(CJob::PRIORITY priority)250 void CJobManager::StartWorkers(CJob::PRIORITY priority)
251 {
252   CSingleLock lock(m_section);
253 
254   // check how many free threads we have
255   if (m_processing.size() >= GetMaxWorkers(priority))
256     return;
257 
258   // do we have any sleeping threads?
259   if (m_processing.size() < m_workers.size())
260   {
261     m_jobEvent.Set();
262     return;
263   }
264 
265   // everyone is busy - we need more workers
266   m_workers.push_back(new CJobWorker(this));
267 }
268 
PopJob()269 CJob *CJobManager::PopJob()
270 {
271   CSingleLock lock(m_section);
272   for (int priority = CJob::PRIORITY_DEDICATED; priority >= CJob::PRIORITY_LOW_PAUSABLE; --priority)
273   {
274     // Check whether we're pausing pausable jobs
275     if (priority == CJob::PRIORITY_LOW_PAUSABLE && m_pauseJobs)
276       continue;
277 
278     if (m_jobQueue[priority].size() && m_processing.size() < GetMaxWorkers(CJob::PRIORITY(priority)))
279     {
280       // pop the job off the queue
281       CWorkItem job = m_jobQueue[priority].front();
282       m_jobQueue[priority].pop_front();
283 
284       // add to the processing vector
285       m_processing.push_back(job);
286       job.m_job->m_callback = this;
287       return job.m_job;
288     }
289   }
290   return NULL;
291 }
292 
PauseJobs()293 void CJobManager::PauseJobs()
294 {
295   CSingleLock lock(m_section);
296   m_pauseJobs = true;
297 }
298 
UnPauseJobs()299 void CJobManager::UnPauseJobs()
300 {
301   CSingleLock lock(m_section);
302   m_pauseJobs = false;
303 }
304 
IsProcessing(const CJob::PRIORITY & priority) const305 bool CJobManager::IsProcessing(const CJob::PRIORITY &priority) const
306 {
307   CSingleLock lock(m_section);
308 
309   if (m_pauseJobs)
310     return false;
311 
312   for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
313   {
314     if (priority == it->m_priority)
315       return true;
316   }
317   return false;
318 }
319 
IsProcessing(const std::string & type) const320 int CJobManager::IsProcessing(const std::string &type) const
321 {
322   int jobsMatched = 0;
323   CSingleLock lock(m_section);
324 
325   if (m_pauseJobs)
326     return 0;
327 
328   for(Processing::const_iterator it = m_processing.begin(); it < m_processing.end(); ++it)
329   {
330     if (type == std::string(it->m_job->GetType()))
331       jobsMatched++;
332   }
333   return jobsMatched;
334 }
335 
GetNextJob(const CJobWorker * worker)336 CJob *CJobManager::GetNextJob(const CJobWorker *worker)
337 {
338   CSingleLock lock(m_section);
339   while (m_running)
340   {
341     // grab a job off the queue if we have one
342     CJob *job = PopJob();
343     if (job)
344       return job;
345     // no jobs are left - sleep for 30 seconds to allow new jobs to come in
346     lock.Leave();
347     bool newJob = m_jobEvent.WaitMSec(30000);
348     lock.Enter();
349     if (!newJob)
350       break;
351   }
352   // ensure no jobs have come in during the period after
353   // timeout and before we held the lock
354   CJob *job = PopJob();
355   if (job)
356     return job;
357   // have no jobs
358   RemoveWorker(worker);
359   return NULL;
360 }
361 
OnJobProgress(unsigned int progress,unsigned int total,const CJob * job) const362 bool CJobManager::OnJobProgress(unsigned int progress, unsigned int total, const CJob *job) const
363 {
364   CSingleLock lock(m_section);
365   // find the job in the processing queue, and check whether it's cancelled (no callback)
366   Processing::const_iterator i = find(m_processing.begin(), m_processing.end(), job);
367   if (i != m_processing.end())
368   {
369     CWorkItem item(*i);
370     lock.Leave(); // leave section prior to call
371     if (item.m_callback)
372     {
373       item.m_callback->OnJobProgress(item.m_id, progress, total, job);
374       return false;
375     }
376   }
377   return true; // couldn't find the job, or it's been cancelled
378 }
379 
OnJobComplete(bool success,CJob * job)380 void CJobManager::OnJobComplete(bool success, CJob *job)
381 {
382   CSingleLock lock(m_section);
383   // remove the job from the processing queue
384   Processing::iterator i = find(m_processing.begin(), m_processing.end(), job);
385   if (i != m_processing.end())
386   {
387     // tell any listeners we're done with the job, then delete it
388     CWorkItem item(*i);
389     lock.Leave();
390     try
391     {
392       if (item.m_callback)
393         item.m_callback->OnJobComplete(item.m_id, success, item.m_job);
394     }
395     catch (...)
396     {
397       CLog::Log(LOGERROR, "%s error processing job %s", __FUNCTION__, item.m_job->GetType());
398     }
399     lock.Enter();
400     Processing::iterator j = find(m_processing.begin(), m_processing.end(), job);
401     if (j != m_processing.end())
402       m_processing.erase(j);
403     lock.Leave();
404     item.FreeJob();
405   }
406 }
407 
RemoveWorker(const CJobWorker * worker)408 void CJobManager::RemoveWorker(const CJobWorker *worker)
409 {
410   CSingleLock lock(m_section);
411   // remove our worker
412   Workers::iterator i = find(m_workers.begin(), m_workers.end(), worker);
413   if (i != m_workers.end())
414     m_workers.erase(i); // workers auto-delete
415 }
416 
GetMaxWorkers(CJob::PRIORITY priority)417 unsigned int CJobManager::GetMaxWorkers(CJob::PRIORITY priority)
418 {
419   static const unsigned int max_workers = 5;
420   if (priority == CJob::PRIORITY_DEDICATED)
421     return 10000; // A large number..
422   return max_workers - (CJob::PRIORITY_HIGH - priority);
423 }
424