1 // Parallel abstract graph traverser. Contributed by Bjarne Knudsen.
2 
3 #include <iostream>
4 #include <vector>
5 #include <deque>
6 #include <thread>
7 #include <condition_variable>
8 //#include <chrono>
9 #include <iomanip>
10 
11 #include "gfanlib_paralleltraverser.h"
12 
13 /*
14   TODO:
15 
16   - Consider giving JobTransfers an affinity to a specific thread.
17 
18   - Consider if the amount of entries to be copied in the stack for
19     deep searches can be reduced.
20 
21   - Consider making a more compact stack for limited in and out edge
22     counts.
23  */
24 
25 using namespace std;
26 
27 namespace gfan{
28 
29 // This struct holds information about one step of a traversal.
30 struct TraverseState {
31   // The number of next edges
32   int  next_count;
33 
34   // The index of the next edge that was followed
35   int  next_index;
36 
37   // The index of the previous edge to get back
38   int  prev_index;
39 
TraverseStategfan::TraverseState40   TraverseState( int  next_count,
41                  int  next_index,
42                  int  prev_index )
43   {
44     this->next_count  =  next_count;
45     this->next_index  =  next_index;
46     this->prev_index  =  prev_index;
47   }
48 };
49 
50 
traverse_simple_recursive(Traverser * traverser)51 void  traverse_simple_recursive( Traverser*  traverser )
52 {
53   int  count  =  traverser->getEdgeCountNext();
54 
55   traverser->collectInfo();
56 
57   for (int i = 0; i < count; i++) {
58     int  prev_index  =  traverser->moveToNext(i, true);
59 
60     if (prev_index == 0) {
61       // Only traverse each state once, so do it for the zero'th
62       // parent.
63       traverse_simple_recursive(traverser);
64     }
65 
66     traverser->moveToPrev(prev_index);
67   }
68 }
69 
70 
71 // This function creates a stack that represents the full job to be
72 // done. Info is also collected for the start state.
create_first_job_stack(Traverser * traverser)73 vector<TraverseState>*  create_first_job_stack( Traverser*  traverser )
74 {
75   vector<TraverseState>*  stack  =  new vector<TraverseState>();
76 
77   stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, -1));
78 
79   traverser->collectInfo();
80 
81   return stack;
82 }
83 
84 
traverse_simple_stack(Traverser * traverser)85 void  traverse_simple_stack( Traverser*  traverser )
86 {
87   // The stack holds information about what we have done so far
88   vector<TraverseState>*  stack  =  create_first_job_stack(traverser);
89 
90   while (!stack->empty()) {
91     stack->back().next_index++;
92 
93     TraverseState  state  =  stack->back();
94 
95     if (state.next_index == state.next_count || traverser->aborting /* Added by Anders */) {
96       if (state.prev_index != -1) {
97         traverser->moveToPrev(state.prev_index);
98       }
99       stack->pop_back();
100     }
101     else {
102       int  prev_index  =  traverser->moveToNext(state.next_index, true);
103 
104       if (prev_index == 0) {
105         traverser->collectInfo();
106         stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, prev_index));
107       }
108       else {
109         traverser->moveToPrev(prev_index);
110       }
111     }
112   }
113 
114   delete stack;
115 }
116 
117 
traverse_simple(Traverser * traverser)118 void  traverse_simple( Traverser*  traverser )
119 {
120   traverse_simple_stack(traverser);
121 }
122 
123 
124 // A Job holds a traverser and a stack representing the job. The stack
125 // is changed along with the traverser, so it represents the state of
126 // the traverser.
127 class Job {
128   Traverser*  traverser;
129 
130   vector<TraverseState>*  stack;
131 
132   // The lowest index in the stack where there is some unfinished
133   // work. -1 if there is no unfinished work.
134   int  first_split;
135 
136   // For a given starting point, find the first index of the stack
137   // with unfinished work. -1 if there is no unfinished work.
find_first_split(vector<TraverseState> * stack,int start)138   static int  find_first_split( vector<TraverseState>*  stack,
139                                 int                     start )
140   {
141     auto  it = stack->begin();
142 
143     it += start;
144     while (it != stack->end() && it->next_index == it->next_count - 1) {
145       start++;
146       it++;
147     }
148 
149     if (it == stack->end()) {
150       return -1;
151     }
152     else if (it->next_index == -1 && it->next_count == 1) {
153       return -1;
154     }
155 
156     return  start;
157   }
158 
159 
160 public:
161 	bool aborting;														// Added by Anders
162   // Create a new Job. if first_split is not set (or it is -2), the
163   // first split will be found.
Job(vector<TraverseState> * stack=new vector<TraverseState> (),int first_split=-2)164   Job( vector<TraverseState>*  stack = new vector<TraverseState>(),
165        int                     first_split = -2 )
166   :aborting(false)														// Added by Anders
167   {
168     if (first_split == -2) {
169       first_split = find_first_split(stack, 0);
170     }
171 
172     this->stack        =  stack;
173     this->first_split  =  first_split;
174   }
175 
176 
~Job(void)177   ~Job( void )
178   {
179     delete stack;
180   }
181 
182 
183   // Set the traverser and move it to the start of this job based on
184   // its last job.
setTraverser(Traverser * traverser,Job * last_job)185   void  setTraverser( Traverser*  traverser,
186                       Job*        last_job )
187   {
188     vector<TraverseState>*  last_stack  =  last_job->stack;
189     unsigned int            i;
190 
191     this->traverser = traverser;
192 
193     // Find the first state where the stacks differ:
194     for (i = 0; i < last_stack->size() && i < stack->size(); i++) {
195       if ((*stack)[i].next_index != (*last_stack)[i].next_index) {
196         break;
197       }
198     }
199 
200     if (i > 0) {
201       // TODO: why is this needed?
202       i--;
203     }
204 
205     // roll back to the division point
206     while (last_stack->size() > i + 1) {
207       traverser->moveToPrev(last_stack->back().prev_index);
208       last_stack->pop_back();
209     }
210     if (!last_stack->empty()) {
211       last_stack->pop_back();
212     }
213 
214     // go forward so the traverser represents the new job
215     for (; i < stack->size() - 1; i++) {
216       traverser->moveToNext((*stack)[i].next_index, false);
217     }
218   }
219 
220 
221   // This function does some work and returns false when there is no
222   // more work. Otherwise does at least step_count steps and returns
223   // true when there is a subjob available.
step(int step_count)224   bool  step( int  step_count )
225   {
226     int  steps  =  0;
227 
228     // keep going if there is no subjob available
229     while (steps < step_count || first_split == -1) {
230       if (stack->empty()) {
231         return false;
232       }
233 
234       stack->back().next_index++;
235 
236       TraverseState  state  =  stack->back();
237 
238       if (state.next_index == state.next_count || aborting /* Added by Anders */) {
239         if (state.prev_index != -1) {
240           traverser->moveToPrev(state.prev_index);
241         }
242         stack->pop_back();
243 
244         if (first_split == -1) {
245           // There is no more work to do in the previous states
246           return false;
247         }
248       }
249       else {
250         if ((int) stack->size() - 1 == first_split) {
251           if (state.next_index == state.next_count - 1) {
252             first_split = -1;
253           }
254         }
255 
256         int  prev_index  =  traverser->moveToNext(state.next_index, true);
257         aborting=traverser->aborting;						/* Added by Anders */
258         steps++;
259 
260         if (prev_index == 0) {
261           int  count  =  traverser->getEdgeCountNext();
262 
263           traverser->collectInfo();
264 
265           if (first_split == -1 && count > 1) {
266             first_split = stack->size();
267           }
268 
269           stack->push_back(TraverseState(count, -1, prev_index));
270         }
271         else {
272           traverser->moveToPrev(prev_index);
273         }
274       }
275     }
276 
277     return  true;
278   }
279 
280 
281   // Get a new subjob of the current job and adjust the current job so
282   // it does not overlap with the new subjob. first_split may not be
283   // -1 when calling this function.
getSubjob(void)284   Job*  getSubjob( void )
285   {
286     // We can assume that first_split >= 0.
287     auto  it = stack->begin();
288 
289     it += first_split + 1;
290     // this new job will do the rest
291     vector<TraverseState>*  new_stack  =  new vector<TraverseState>(stack->begin(), it);
292     it--;
293 
294     // limit the existing job
295     it->next_count = it->next_index + 1;
296 
297     Job* new_job =  new Job(new_stack, find_first_split(new_stack, first_split));
298 
299     first_split = find_first_split(stack, first_split);
300 
301     return new_job;
302   }
303 
304 
print(void)305   void  print( void )
306   {
307     cout << "--cc-nn-pp----------" << endl;
308     for(vector<TraverseState>::const_iterator state=stack->begin();state!=stack->end();state++){
309       //    for (TraverseState state : *stack) {
310       cout << "  " << setw(2) << state->next_count << " " << setw(2) << state->next_index
311            << " " << setw(2) << state->prev_index << endl;
312     }
313     cout << "--------------------" << endl;
314   }
315 };
316 
317 
318 // This class is used to safely transfer a job from one thread to another
319 class JobTransfer {
320   bool  is_set;
321 
322   Job*  job;
323 
324   mutex  mtx;
325 
326   condition_variable  cond;
327 
328 public:
JobTransfer(void)329   JobTransfer( void )
330   {
331     is_set  =  false;
332   }
333 
334 
setJob(Job * job)335   void  setJob( Job*  job )
336   {
337     mtx.lock();
338 
339     this->job  =  job;
340     is_set     =  true;
341 
342     // notify should be done after unlock according to
343     // http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one
344     // but doing so seems to cause a deadlock with 8 threads using
345     // SubsetTraversers of size 16 on my laptop
346     cond.notify_one();
347     mtx.unlock();
348   }
349 
350 
getJob(void)351   Job*  getJob( void )
352   {
353     unique_lock<mutex>  lock(mtx);
354 
355     while (!is_set) {
356       cond.wait(lock);
357     }
358 
359     lock.unlock();
360 
361     return job;
362   }
363 };
364 
365 
366 class JobCentral;
367 
368 // A struct for holding all the information needed by a thread.
369 struct ThreadContext {
370   JobCentral*  central;
371 
372   Traverser*  traverser;
373 
374   int  step_count;
375 
ThreadContextgfan::ThreadContext376   ThreadContext( JobCentral*  central,
377                  Traverser*   traverser,
378                  int          step_count )
379   {
380     this->central     =  central;
381     this->traverser   =  traverser;
382     this->step_count  =  step_count;
383   }
384 };
385 
386 
387 void  work( ThreadContext*  context );
388 
389 
390 // This class is used running the whole threaded traversal and for
391 // exchanging jobs between threads
392 class JobCentral {
393   ThreadContext**  contexts;
394 
395   int  context_count;
396 
397   int  step_count;
398 
399   Job*  first_job;
400 
401   mutex  mtx;
402 
403   // This queue is used for transferring jobs between threads. When a
404   // thread requests a job, an empty transfer is put in this queue. It
405   // will then be picked up by a thread that has a subjob
406   // available. The subjob is given to the JobTransfer and is then
407   // received by the thread needing it.
408   deque<JobTransfer*>*  transfers;
409 
410 public:
411 	bool aborting;									// Added by Anders
412   // step_count is the number of algorithm steps taken between
413   // possible job transfers. This value should be high (e.g. 100) if
414   // the traverser is very fast. If the traverser is slow, step_count
415   // should be one.
JobCentral(Traverser ** traversers,int count,int step_count)416   JobCentral( Traverser**  traversers,
417               int          count,
418               int          step_count )
419   :aborting(false)									// Added by Anders
420   {
421     context_count  =  count;
422     contexts       =  new ThreadContext*[count];
423 
424     for (int i = 0; i < count; i++) {
425       contexts[i] = new ThreadContext(this, traversers[i], step_count);
426     }
427 
428     transfers = new deque<JobTransfer*>();
429   }
430 
431 
~JobCentral(void)432   ~JobCentral( void )
433   {
434     delete transfers;
435     for (int i = 0; i < context_count; i++) {
436       delete contexts[i];
437     }
438     delete[] contexts;
439   }
440 
441 
442   // Run a job in multiple threads. This function is usually just
443   // called once with a job representing everything to be done.
runJob(Job * job)444   void  runJob( Job*  job )
445   {
446     thread**  thr  = new thread*[context_count];
447 
448     this->first_job  =  job;
449 
450     for (int i = 0; i < context_count; i++) {
451       thr[i] = new thread(work, contexts[i]);
452     }
453 
454     for (int i = 0; i < context_count; i++) {
455       thr[i]->join();
456       delete thr[i];
457     }
458 
459     delete[] thr;
460   }
461 
462 
hasTransfer(void)463   bool  hasTransfer( void )
464   {
465 	return !transfers->empty();
466   }
467 
468 
469   // If no threads are requesting jobs, the return value will be
470   // NULL. Otherwise a JobTransfer will be returned ready for
471   // receiving a new job.
getTransfer(void)472   JobTransfer*  getTransfer( void )
473   {
474     JobTransfer*  transfer  =  NULL;
475 
476     mtx.lock();
477 
478     if (!transfers->empty()) {
479       transfer = transfers->back();
480       transfers->pop_back();
481     }
482 
483     mtx.unlock();
484 
485     return  transfer;
486   }
487 
488 
489   // Requst a job from another thread. The return value is NULL if all
490   // jobs are done.
getJob(void)491   Job*  getJob( void )
492   {
493     Job*  job  =  NULL;
494 
495     mtx.lock();
496 
497     if (first_job != NULL) {
498       job        =  first_job;
499       first_job  =  NULL;
500     }
501     else if ((int) transfers->size() < context_count - 1) {
502       JobTransfer*  transfer =  new JobTransfer();
503 
504       transfers->push_front(transfer);
505 
506       mtx.unlock();
507 
508       job =  transfer->getJob();
509 
510       delete transfer;
511 
512       // return now because the mutex is already unlocked
513       return job;
514     }
515     else {
516       // We are fully done
517 
518       for(deque<JobTransfer*>::const_iterator tr=transfers->begin();tr!=transfers->end();tr++){
519 	//      for (JobTransfer*  tr : *transfers) {
520         (*tr)->setJob(NULL);
521       }
522     }
523 
524     mtx.unlock();
525 
526     return job;
527   }
528 };
529 
530 
531 // Do the actual work
work(ThreadContext * context)532 void  work( ThreadContext*  context )
533 {
534   Traverser*   traverser   =  context->traverser;
535   JobCentral*  central     =  context->central;
536   Job*         job;
537   Job*         last_job   =  new Job();
538 
539   while ((job = central->getJob()) != NULL) {
540     job->setTraverser(traverser, last_job);
541 
542     int  step_count  =  central->hasTransfer() ? 1 : context->step_count;
543 
544     if(central->aborting)job->aborting=true;			// Added by Anders
545 
546     while (job->step(step_count)) {
547       if(job->aborting)central->aborting=true;			// Added by Anders
548       JobTransfer*  transfer  =  central->getTransfer();
549 
550       if (transfer != NULL) {
551         transfer->setJob(job->getSubjob());
552       }
553 
554       step_count  =  central->hasTransfer() ? 1 : context->step_count;
555     }
556 
557     delete last_job;
558     last_job = job;
559   }
560 
561   delete last_job;
562 }
563 
564 
565 // Do the actual work
work2(ThreadContext * context)566 void  work2( ThreadContext*  context )
567 {
568   Traverser*   traverser   =  context->traverser;
569   JobCentral*  central     =  context->central;
570   int          step_count  =  context->step_count;
571   Job*         job;
572   Job*         last_job   =  new Job();
573 
574   while ((job = central->getJob()) != NULL) {
575     job->setTraverser(traverser, last_job);
576 
577     while (job->step(step_count)) {
578       JobTransfer*  transfer  =  central->getTransfer();
579 
580       if (transfer != NULL) {
581         transfer->setJob(job->getSubjob());
582       }
583     }
584 
585     delete last_job;
586     last_job = job;
587   }
588 
589   delete last_job;
590 }
591 
592 
traverse_threaded(Traverser ** traversers,int count,int step_count)593 void  traverse_threaded( Traverser**  traversers,
594                          int          count,
595                          int          step_count )
596 {
597   JobCentral*  central  =  new JobCentral(traversers, count, step_count);
598 
599   central->runJob(new Job(create_first_job_stack(traversers[0])));
600 
601   delete central;
602 }
603 }
604