1 // Copyright 2015-2016 the openage authors. See copying.md for legal info. 2 3 #include "job_aborted_exception.h" 4 #include "job_manager.h" 5 #include "worker.h" 6 7 8 namespace openage { 9 namespace job { 10 11 Worker(JobManager * manager)12Worker::Worker(JobManager *manager) 13 : 14 manager{manager}, 15 is_running{false} { 16 } 17 18 start()19void Worker::start() { 20 this->is_running = true; 21 this->executor.reset(new std::thread{&Worker::process, this}); 22 } 23 24 stop()25void Worker::stop() { 26 std::unique_lock<std::mutex> lock{this->pending_jobs_mutex}; 27 this->is_running = false; 28 lock.unlock(); 29 this->notify(); 30 } 31 32 enqueue(std::shared_ptr<JobStateBase> job)33void Worker::enqueue(std::shared_ptr<JobStateBase> job) { 34 std::unique_lock<std::mutex> lock{this->pending_jobs_mutex}; 35 this->pending_jobs.push(job); 36 lock.unlock(); 37 this->notify(); 38 } 39 40 notify()41void Worker::notify() { 42 this->jobs_available.notify_all(); 43 } 44 45 join()46void Worker::join() { 47 this->executor->join(); 48 } 49 50 execute_job(std::shared_ptr<JobStateBase> & job)51void Worker::execute_job(std::shared_ptr<JobStateBase> &job) { 52 auto should_abort = [this]() { 53 return not this->is_running; 54 }; 55 56 bool aborted = job->execute(should_abort); 57 // if the job was not aborted, tell the job manager, that the job has 58 // finished 59 if (not aborted) { 60 this->manager->finish_job(job); 61 } 62 } 63 64 process()65void Worker::process() { 66 // as long as this worker thread is running repeat all steps 67 while (true) { 68 // lock the local thread queue 69 std::unique_lock<std::mutex> lock{this->pending_jobs_mutex}; 70 71 // if this thread shall not run any longer, exit the loop 72 if (not this->is_running) { 73 return; 74 } 75 76 // as long as there are no jobs from the local queue or the job manager 77 while (this->pending_jobs.empty() and not this->manager->has_job()) { 78 // the thread should wait 79 this->jobs_available.wait(lock); 80 81 // when the thread is notified, first check if the thread should be 82 // stopped 83 if (not this->is_running) { 84 return; 85 } 86 } 87 88 // check if there are jobs in the local queue 89 if (not this->pending_jobs.empty()) { 90 // fetch the job 91 auto job = this->pending_jobs.front(); 92 this->pending_jobs.pop(); 93 // release the local queue lock 94 lock.unlock(); 95 // and execute the job 96 this->execute_job(job); 97 } else { 98 // otherwise just unlock the local queue 99 lock.unlock(); 100 } 101 102 // after possibly executing a job from the local queue, check again if 103 // the thread should still continue running 104 if (not this->is_running) { 105 return; 106 } 107 108 // now try to fetch a job from the job manager 109 auto manager_job = this->manager->fetch_job(); 110 if (manager_job.get() != nullptr) { 111 // and execute it 112 this->execute_job(manager_job); 113 } 114 } 115 } 116 117 118 }} // namespace openage::job 119