1 // Copyright 2015-2016 the openage authors. See copying.md for legal info.
2 
3 #include "job_aborted_exception.h"
4 #include "job_manager.h"
5 #include "worker.h"
6 
7 
8 namespace openage {
9 namespace job {
10 
11 
Worker(JobManager * manager)12 Worker::Worker(JobManager *manager)
13 	:
14 	manager{manager},
15 	is_running{false} {
16 }
17 
18 
start()19 void Worker::start() {
20 	this->is_running = true;
21 	this->executor.reset(new std::thread{&Worker::process, this});
22 }
23 
24 
stop()25 void Worker::stop() {
26 	std::unique_lock<std::mutex> lock{this->pending_jobs_mutex};
27 	this->is_running = false;
28 	lock.unlock();
29 	this->notify();
30 }
31 
32 
enqueue(std::shared_ptr<JobStateBase> job)33 void Worker::enqueue(std::shared_ptr<JobStateBase> job) {
34 	std::unique_lock<std::mutex> lock{this->pending_jobs_mutex};
35 	this->pending_jobs.push(job);
36 	lock.unlock();
37 	this->notify();
38 }
39 
40 
notify()41 void Worker::notify() {
42 	this->jobs_available.notify_all();
43 }
44 
45 
join()46 void Worker::join() {
47 	this->executor->join();
48 }
49 
50 
execute_job(std::shared_ptr<JobStateBase> & job)51 void Worker::execute_job(std::shared_ptr<JobStateBase> &job) {
52 	auto should_abort = [this]() {
53 		return not this->is_running;
54 	};
55 
56 	bool aborted = job->execute(should_abort);
57 	// if the job was not aborted, tell the job manager, that the job has
58 	// finished
59 	if (not aborted) {
60 		this->manager->finish_job(job);
61 	}
62 }
63 
64 
process()65 void Worker::process() {
66 	// as long as this worker thread is running repeat all steps
67 	while (true) {
68 		// lock the local thread queue
69 		std::unique_lock<std::mutex> lock{this->pending_jobs_mutex};
70 
71 		// if this thread shall not run any longer, exit the loop
72 		if (not this->is_running) {
73 			return;
74 		}
75 
76 		// as long as there are no jobs from the local queue or the job manager
77 		while (this->pending_jobs.empty() and not this->manager->has_job()) {
78 			// the thread should wait
79 			this->jobs_available.wait(lock);
80 
81 			// when the thread is notified, first check if the thread should be
82 			// stopped
83 			if (not this->is_running) {
84 				return;
85 			}
86 		}
87 
88 		// check if there are jobs in the local queue
89 		if (not this->pending_jobs.empty()) {
90 			// fetch the job
91 			auto job = this->pending_jobs.front();
92 			this->pending_jobs.pop();
93 			// release the local queue lock
94 			lock.unlock();
95 			// and execute the job
96 			this->execute_job(job);
97 		} else {
98 			// otherwise just unlock the local queue
99 			lock.unlock();
100 		}
101 
102 		// after possibly executing a job from the local queue, check again if
103 		// the thread should still continue running
104 		if (not this->is_running) {
105 			return;
106 		}
107 
108 		// now try to fetch a job from the job manager
109 		auto manager_job = this->manager->fetch_job();
110 		if (manager_job.get() != nullptr) {
111 			// and execute it
112 			this->execute_job(manager_job);
113 		}
114 	}
115 }
116 
117 
118 }} // namespace openage::job
119