1 /**************************************************************************/
2 /*  Copyright 2012 Tim Day                                                */
3 /*                                                                        */
4 /*  This file is part of Evolvotron                                       */
5 /*                                                                        */
6 /*  Evolvotron is free software: you can redistribute it and/or modify    */
7 /*  it under the terms of the GNU General Public License as published by  */
8 /*  the Free Software Foundation, either version 3 of the License, or     */
9 /*  (at your option) any later version.                                   */
10 /*                                                                        */
11 /*  Evolvotron is distributed in the hope that it will be useful,         */
12 /*  but WITHOUT ANY WARRANTY; without even the implied warranty of        */
13 /*  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         */
14 /*  GNU General Public License for more details.                          */
15 /*                                                                        */
16 /*  You should have received a copy of the GNU General Public License     */
17 /*  along with Evolvotron.  If not, see <http://www.gnu.org/licenses/>.   */
18 /**************************************************************************/
19 
20 /*! \file
21   \brief Implementation of class MutatableImageComputerFarm.
22 */
23 
24 
25 
26 #include "mutatable_image_computer_farm.h"
27 
28 #include "mutatable_image_computer.h"
29 
30 /*! Creates the specified number of threads and store pointers to them.
31  */
MutatableImageComputerFarm(uint n_threads,int niceness)32 MutatableImageComputerFarm::MutatableImageComputerFarm(uint n_threads,int niceness)
33 {
34   _done_position=_done.end();
35 
36   for (uint i=0;i<n_threads;i++)
37     {
38       // The computer's constructor includes a start()
39       _computers.push_back(new MutatableImageComputer(this,niceness));
40     }
41 }
42 
43 /*! Destructor kills off all compute threads and frees their resources.
44   NB The MutatableImageComputer destructor signals the thread to stop and waits for it.
45  */
~MutatableImageComputerFarm()46 MutatableImageComputerFarm::~MutatableImageComputerFarm()
47 {
48   std::clog << "Compute farm shut down begun...\n";
49 
50   // Kill all the computers (care needed to wake any waiting ones).
51   for (boost::ptr_vector<MutatableImageComputer>::iterator it=_computers.begin();it!=_computers.end();it++) (*it).kill();
52   _wait_condition.wakeAll();
53   _computers.clear();
54 
55   // Clear all the tasks in queues
56   {
57     QMutexLocker lock(&_mutex);
58     _todo.clear();
59     _done.clear();
60   }
61 
62   std::clog << "...completed compute farm shut down\n";
63 }
64 
65 //! Predicate function to test whether a task has been aborted
predicate_aborted(const boost::shared_ptr<const MutatableImageComputerTask> t)66 static bool predicate_aborted(const boost::shared_ptr<const MutatableImageComputerTask> t)
67 {
68   return t->aborted();
69 }
70 
fasttrack_aborted()71 void MutatableImageComputerFarm::fasttrack_aborted()
72 {
73   QMutexLocker lock(&_mutex);
74 
75   // \todo: Inefficient starting search again each time.  Some problem with erase otherwise though, but might have been task abort mem leak.
76   TodoQueue::iterator it;
77   while (
78 	 (
79 	  it=std::find_if(_todo.begin(),_todo.end(),predicate_aborted)
80 	  )
81 	 !=
82 	 _todo.end()
83 	 )
84     {
85       _done[(*it)->display()].insert(*it);
86       _todo.erase(it);
87     }
88 }
89 
push_todo(const boost::shared_ptr<MutatableImageComputerTask> & task)90 void MutatableImageComputerFarm::push_todo(const boost::shared_ptr<MutatableImageComputerTask>& task)
91 {
92   {
93     QMutexLocker lock(&_mutex);
94 
95     // We could be in a situation where there are tasks with lower priority which should be defered in favour of this one.
96     // Currently we simply defer everything with a lower priority and let the queue sort them out.
97     //! \todo: It would be better to just defer the lowest priority task if there's any less than the queued task.
98     /*
99     bool any_deferred=false;
100     for (boost::ptr_vector<MutatableImageComputer>::iterator it=_computers.begin();it!=_computers.end();it++)
101       {
102 	if ((*it).defer_if_less_important_than(task->priority()))
103 	  {
104 	    any_deferred=true;
105 	  }
106       }
107     */
108 
109     _todo.insert(task);
110   }
111 
112   // If there any threads waiting, we should wake one up.
113   _wait_condition.wakeOne();
114 }
115 
pop_todo(MutatableImageComputer & requester)116 const boost::shared_ptr<MutatableImageComputerTask> MutatableImageComputerFarm::pop_todo(MutatableImageComputer& requester)
117 {
118   _mutex.lock();
119   boost::shared_ptr<MutatableImageComputerTask> ret;
120   while (!ret)
121     {
122       TodoQueue::iterator it=_todo.begin();
123       if (it!=_todo.end())
124 	{
125 	  ret=(*it);
126 	  _todo.erase(it);
127 	}
128       else
129 	{
130 	  std::clog << "Thread waiting\n";
131 	  _wait_condition.wait(&_mutex);
132 	  std::clog << "Thread woken\n";
133 	  if (requester.killed()) break;
134 	}
135     }
136   _mutex.unlock();
137   return ret;
138 }
139 
push_done(const boost::shared_ptr<MutatableImageComputerTask> & task)140 void MutatableImageComputerFarm::push_done(const boost::shared_ptr<MutatableImageComputerTask>& task)
141 {
142   QMutexLocker lock(&_mutex);
143   _done[task->display()].insert(task);
144 }
145 
pop_done()146 const boost::shared_ptr<MutatableImageComputerTask> MutatableImageComputerFarm::pop_done()
147 {
148   QMutexLocker lock(&_mutex);
149 
150   boost::shared_ptr<MutatableImageComputerTask> ret;
151   if (_done_position==_done.end())
152     {
153       _done_position=_done.begin();
154     }
155 
156   if (_done_position!=_done.end())
157     {
158       DoneQueue& q=(*_done_position).second;
159       DoneQueue::iterator it=q.begin();
160       if (it!=q.end())
161 	{
162 	  ret=(*it);
163 	  q.erase(it);
164 	}
165 
166       if (q.empty())
167 	{
168 	  DoneQueueByDisplay::iterator advanced_done_position=_done_position;
169 	  advanced_done_position++;
170 	  _done.erase(_done_position);
171 	  _done_position=advanced_done_position;
172 	}
173       else
174 	{
175 	  _done_position++;
176 	}
177     }
178 
179   return ret;
180 }
181 
abort_all()182 void MutatableImageComputerFarm::abort_all()
183 {
184   QMutexLocker lock(&_mutex);
185 
186   for (TodoQueue::iterator it=_todo.begin();it!=_todo.end();it++)
187     {
188       (*it)->abort();
189     }
190   _todo.clear();
191 
192   for (boost::ptr_vector<MutatableImageComputer>::iterator it=_computers.begin();it!=_computers.end();it++)
193     {
194       (*it).abort();
195     }
196 
197   for (DoneQueueByDisplay::iterator it0=_done.begin();it0!=_done.end();it0++)
198     {
199       DoneQueue& q=(*it0).second;
200       for (DoneQueue::iterator it1=q.begin();it1!=q.end();it1++)
201 	{
202 	  (*it1)->abort();
203 	}
204     }
205   _done.clear();
206 }
207 
abort_for(const MutatableImageDisplay * disp)208 void MutatableImageComputerFarm::abort_for(const MutatableImageDisplay* disp)
209 {
210   QMutexLocker lock(&_mutex);
211 
212   for (TodoQueue::iterator it=_todo.begin();it!=_todo.end();it++)
213     {
214       if ((*it)->display()==disp)
215 	{
216 	  (*it)->abort();
217 	  _todo.erase(it);
218 	}
219     }
220 
221   for (boost::ptr_vector<MutatableImageComputer>::iterator it=_computers.begin();it!=_computers.end();it++)
222     {
223       (*it).abort_for(disp);
224     }
225 
226   DoneQueueByDisplay::iterator it0=_done.find(disp);
227   if (it0!=_done.end())
228     {
229       DoneQueue& q=(*it0).second;
230 
231       //! \todo It would be pretty odd if display didn't match the queue bin: change to assert
232       for (DoneQueue::iterator it1=q.begin();it1!=q.end();it1++)
233 	{
234 	  if ((*it1)->display()==disp)
235 	    {
236 	      (*it1)->abort();
237 	      q.erase(it1);
238 	    }
239 	}
240     }
241 }
242 
tasks() const243 uint MutatableImageComputerFarm::tasks() const
244 {
245   uint ret=0;
246 
247   for (boost::ptr_vector<MutatableImageComputer>::const_iterator it=_computers.begin();it!=_computers.end();it++)
248     {
249       if ((*it).active())
250 	{
251 	  ret++;
252 	}
253     }
254 
255   QMutexLocker lock(&_mutex);
256 
257   ret+=_todo.size();
258 
259   for (DoneQueueByDisplay::const_iterator it=_done.begin();it!=_done.end();it++)
260     ret+=(*it).second.size();
261 
262   return ret;
263 }
264