1 // Parallel abstract graph traverser. Contributed by Bjarne Knudsen.
2
3 #include <iostream>
4 #include <vector>
5 #include <deque>
6 #include <thread>
7 #include <condition_variable>
8 //#include <chrono>
9 #include <iomanip>
10
11 #include "gfanlib_paralleltraverser.h"
12
13 /*
14 TODO:
15
16 - Consider giving JobTransfers an affinity to a specific thread.
17
18 - Consider if the amount of entries to be copied in the stack for
19 deep searches can be reduced.
20
21 - Consider making a more compact stack for limited in and out edge
22 counts.
23 */
24
25 using namespace std;
26
27 namespace gfan{
28
29 // This struct holds information about one step of a traversal.
30 struct TraverseState {
31 // The number of next edges
32 int next_count;
33
34 // The index of the next edge that was followed
35 int next_index;
36
37 // The index of the previous edge to get back
38 int prev_index;
39
TraverseStategfan::TraverseState40 TraverseState( int next_count,
41 int next_index,
42 int prev_index )
43 {
44 this->next_count = next_count;
45 this->next_index = next_index;
46 this->prev_index = prev_index;
47 }
48 };
49
50
traverse_simple_recursive(Traverser * traverser)51 void traverse_simple_recursive( Traverser* traverser )
52 {
53 int count = traverser->getEdgeCountNext();
54
55 traverser->collectInfo();
56
57 for (int i = 0; i < count; i++) {
58 int prev_index = traverser->moveToNext(i, true);
59
60 if (prev_index == 0) {
61 // Only traverse each state once, so do it for the zero'th
62 // parent.
63 traverse_simple_recursive(traverser);
64 }
65
66 traverser->moveToPrev(prev_index);
67 }
68 }
69
70
71 // This function creates a stack that represents the full job to be
72 // done. Info is also collected for the start state.
create_first_job_stack(Traverser * traverser)73 vector<TraverseState>* create_first_job_stack( Traverser* traverser )
74 {
75 vector<TraverseState>* stack = new vector<TraverseState>();
76
77 stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, -1));
78
79 traverser->collectInfo();
80
81 return stack;
82 }
83
84
traverse_simple_stack(Traverser * traverser)85 void traverse_simple_stack( Traverser* traverser )
86 {
87 // The stack holds information about what we have done so far
88 vector<TraverseState>* stack = create_first_job_stack(traverser);
89
90 while (!stack->empty()) {
91 stack->back().next_index++;
92
93 TraverseState state = stack->back();
94
95 if (state.next_index == state.next_count || traverser->aborting /* Added by Anders */) {
96 if (state.prev_index != -1) {
97 traverser->moveToPrev(state.prev_index);
98 }
99 stack->pop_back();
100 }
101 else {
102 int prev_index = traverser->moveToNext(state.next_index, true);
103
104 if (prev_index == 0) {
105 traverser->collectInfo();
106 stack->push_back(TraverseState(traverser->getEdgeCountNext(), -1, prev_index));
107 }
108 else {
109 traverser->moveToPrev(prev_index);
110 }
111 }
112 }
113
114 delete stack;
115 }
116
117
traverse_simple(Traverser * traverser)118 void traverse_simple( Traverser* traverser )
119 {
120 traverse_simple_stack(traverser);
121 }
122
123
124 // A Job holds a traverser and a stack representing the job. The stack
125 // is changed along with the traverser, so it represents the state of
126 // the traverser.
127 class Job {
128 Traverser* traverser;
129
130 vector<TraverseState>* stack;
131
132 // The lowest index in the stack where there is some unfinished
133 // work. -1 if there is no unfinished work.
134 int first_split;
135
136 // For a given starting point, find the first index of the stack
137 // with unfinished work. -1 if there is no unfinished work.
find_first_split(vector<TraverseState> * stack,int start)138 static int find_first_split( vector<TraverseState>* stack,
139 int start )
140 {
141 auto it = stack->begin();
142
143 it += start;
144 while (it != stack->end() && it->next_index == it->next_count - 1) {
145 start++;
146 it++;
147 }
148
149 if (it == stack->end()) {
150 return -1;
151 }
152 else if (it->next_index == -1 && it->next_count == 1) {
153 return -1;
154 }
155
156 return start;
157 }
158
159
160 public:
161 bool aborting; // Added by Anders
162 // Create a new Job. if first_split is not set (or it is -2), the
163 // first split will be found.
Job(vector<TraverseState> * stack=new vector<TraverseState> (),int first_split=-2)164 Job( vector<TraverseState>* stack = new vector<TraverseState>(),
165 int first_split = -2 )
166 :aborting(false) // Added by Anders
167 {
168 if (first_split == -2) {
169 first_split = find_first_split(stack, 0);
170 }
171
172 this->stack = stack;
173 this->first_split = first_split;
174 }
175
176
~Job(void)177 ~Job( void )
178 {
179 delete stack;
180 }
181
182
183 // Set the traverser and move it to the start of this job based on
184 // its last job.
setTraverser(Traverser * traverser,Job * last_job)185 void setTraverser( Traverser* traverser,
186 Job* last_job )
187 {
188 vector<TraverseState>* last_stack = last_job->stack;
189 unsigned int i;
190
191 this->traverser = traverser;
192
193 // Find the first state where the stacks differ:
194 for (i = 0; i < last_stack->size() && i < stack->size(); i++) {
195 if ((*stack)[i].next_index != (*last_stack)[i].next_index) {
196 break;
197 }
198 }
199
200 if (i > 0) {
201 // TODO: why is this needed?
202 i--;
203 }
204
205 // roll back to the division point
206 while (last_stack->size() > i + 1) {
207 traverser->moveToPrev(last_stack->back().prev_index);
208 last_stack->pop_back();
209 }
210 if (!last_stack->empty()) {
211 last_stack->pop_back();
212 }
213
214 // go forward so the traverser represents the new job
215 for (; i < stack->size() - 1; i++) {
216 traverser->moveToNext((*stack)[i].next_index, false);
217 }
218 }
219
220
221 // This function does some work and returns false when there is no
222 // more work. Otherwise does at least step_count steps and returns
223 // true when there is a subjob available.
step(int step_count)224 bool step( int step_count )
225 {
226 int steps = 0;
227
228 // keep going if there is no subjob available
229 while (steps < step_count || first_split == -1) {
230 if (stack->empty()) {
231 return false;
232 }
233
234 stack->back().next_index++;
235
236 TraverseState state = stack->back();
237
238 if (state.next_index == state.next_count || aborting /* Added by Anders */) {
239 if (state.prev_index != -1) {
240 traverser->moveToPrev(state.prev_index);
241 }
242 stack->pop_back();
243
244 if (first_split == -1) {
245 // There is no more work to do in the previous states
246 return false;
247 }
248 }
249 else {
250 if ((int) stack->size() - 1 == first_split) {
251 if (state.next_index == state.next_count - 1) {
252 first_split = -1;
253 }
254 }
255
256 int prev_index = traverser->moveToNext(state.next_index, true);
257 aborting=traverser->aborting; /* Added by Anders */
258 steps++;
259
260 if (prev_index == 0) {
261 int count = traverser->getEdgeCountNext();
262
263 traverser->collectInfo();
264
265 if (first_split == -1 && count > 1) {
266 first_split = stack->size();
267 }
268
269 stack->push_back(TraverseState(count, -1, prev_index));
270 }
271 else {
272 traverser->moveToPrev(prev_index);
273 }
274 }
275 }
276
277 return true;
278 }
279
280
281 // Get a new subjob of the current job and adjust the current job so
282 // it does not overlap with the new subjob. first_split may not be
283 // -1 when calling this function.
getSubjob(void)284 Job* getSubjob( void )
285 {
286 // We can assume that first_split >= 0.
287 auto it = stack->begin();
288
289 it += first_split + 1;
290 // this new job will do the rest
291 vector<TraverseState>* new_stack = new vector<TraverseState>(stack->begin(), it);
292 it--;
293
294 // limit the existing job
295 it->next_count = it->next_index + 1;
296
297 Job* new_job = new Job(new_stack, find_first_split(new_stack, first_split));
298
299 first_split = find_first_split(stack, first_split);
300
301 return new_job;
302 }
303
304
print(void)305 void print( void )
306 {
307 cout << "--cc-nn-pp----------" << endl;
308 for(vector<TraverseState>::const_iterator state=stack->begin();state!=stack->end();state++){
309 // for (TraverseState state : *stack) {
310 cout << " " << setw(2) << state->next_count << " " << setw(2) << state->next_index
311 << " " << setw(2) << state->prev_index << endl;
312 }
313 cout << "--------------------" << endl;
314 }
315 };
316
317
318 // This class is used to safely transfer a job from one thread to another
319 class JobTransfer {
320 bool is_set;
321
322 Job* job;
323
324 mutex mtx;
325
326 condition_variable cond;
327
328 public:
JobTransfer(void)329 JobTransfer( void )
330 {
331 is_set = false;
332 }
333
334
setJob(Job * job)335 void setJob( Job* job )
336 {
337 mtx.lock();
338
339 this->job = job;
340 is_set = true;
341
342 // notify should be done after unlock according to
343 // http://en.cppreference.com/w/cpp/thread/condition_variable/notify_one
344 // but doing so seems to cause a deadlock with 8 threads using
345 // SubsetTraversers of size 16 on my laptop
346 cond.notify_one();
347 mtx.unlock();
348 }
349
350
getJob(void)351 Job* getJob( void )
352 {
353 unique_lock<mutex> lock(mtx);
354
355 while (!is_set) {
356 cond.wait(lock);
357 }
358
359 lock.unlock();
360
361 return job;
362 }
363 };
364
365
366 class JobCentral;
367
368 // A struct for holding all the information needed by a thread.
369 struct ThreadContext {
370 JobCentral* central;
371
372 Traverser* traverser;
373
374 int step_count;
375
ThreadContextgfan::ThreadContext376 ThreadContext( JobCentral* central,
377 Traverser* traverser,
378 int step_count )
379 {
380 this->central = central;
381 this->traverser = traverser;
382 this->step_count = step_count;
383 }
384 };
385
386
387 void work( ThreadContext* context );
388
389
390 // This class is used running the whole threaded traversal and for
391 // exchanging jobs between threads
392 class JobCentral {
393 ThreadContext** contexts;
394
395 int context_count;
396
397 int step_count;
398
399 Job* first_job;
400
401 mutex mtx;
402
403 // This queue is used for transferring jobs between threads. When a
404 // thread requests a job, an empty transfer is put in this queue. It
405 // will then be picked up by a thread that has a subjob
406 // available. The subjob is given to the JobTransfer and is then
407 // received by the thread needing it.
408 deque<JobTransfer*>* transfers;
409
410 public:
411 bool aborting; // Added by Anders
412 // step_count is the number of algorithm steps taken between
413 // possible job transfers. This value should be high (e.g. 100) if
414 // the traverser is very fast. If the traverser is slow, step_count
415 // should be one.
JobCentral(Traverser ** traversers,int count,int step_count)416 JobCentral( Traverser** traversers,
417 int count,
418 int step_count )
419 :aborting(false) // Added by Anders
420 {
421 context_count = count;
422 contexts = new ThreadContext*[count];
423
424 for (int i = 0; i < count; i++) {
425 contexts[i] = new ThreadContext(this, traversers[i], step_count);
426 }
427
428 transfers = new deque<JobTransfer*>();
429 }
430
431
~JobCentral(void)432 ~JobCentral( void )
433 {
434 delete transfers;
435 for (int i = 0; i < context_count; i++) {
436 delete contexts[i];
437 }
438 delete[] contexts;
439 }
440
441
442 // Run a job in multiple threads. This function is usually just
443 // called once with a job representing everything to be done.
runJob(Job * job)444 void runJob( Job* job )
445 {
446 thread** thr = new thread*[context_count];
447
448 this->first_job = job;
449
450 for (int i = 0; i < context_count; i++) {
451 thr[i] = new thread(work, contexts[i]);
452 }
453
454 for (int i = 0; i < context_count; i++) {
455 thr[i]->join();
456 delete thr[i];
457 }
458
459 delete[] thr;
460 }
461
462
hasTransfer(void)463 bool hasTransfer( void )
464 {
465 return !transfers->empty();
466 }
467
468
469 // If no threads are requesting jobs, the return value will be
470 // NULL. Otherwise a JobTransfer will be returned ready for
471 // receiving a new job.
getTransfer(void)472 JobTransfer* getTransfer( void )
473 {
474 JobTransfer* transfer = NULL;
475
476 mtx.lock();
477
478 if (!transfers->empty()) {
479 transfer = transfers->back();
480 transfers->pop_back();
481 }
482
483 mtx.unlock();
484
485 return transfer;
486 }
487
488
489 // Requst a job from another thread. The return value is NULL if all
490 // jobs are done.
getJob(void)491 Job* getJob( void )
492 {
493 Job* job = NULL;
494
495 mtx.lock();
496
497 if (first_job != NULL) {
498 job = first_job;
499 first_job = NULL;
500 }
501 else if ((int) transfers->size() < context_count - 1) {
502 JobTransfer* transfer = new JobTransfer();
503
504 transfers->push_front(transfer);
505
506 mtx.unlock();
507
508 job = transfer->getJob();
509
510 delete transfer;
511
512 // return now because the mutex is already unlocked
513 return job;
514 }
515 else {
516 // We are fully done
517
518 for(deque<JobTransfer*>::const_iterator tr=transfers->begin();tr!=transfers->end();tr++){
519 // for (JobTransfer* tr : *transfers) {
520 (*tr)->setJob(NULL);
521 }
522 }
523
524 mtx.unlock();
525
526 return job;
527 }
528 };
529
530
531 // Do the actual work
work(ThreadContext * context)532 void work( ThreadContext* context )
533 {
534 Traverser* traverser = context->traverser;
535 JobCentral* central = context->central;
536 Job* job;
537 Job* last_job = new Job();
538
539 while ((job = central->getJob()) != NULL) {
540 job->setTraverser(traverser, last_job);
541
542 int step_count = central->hasTransfer() ? 1 : context->step_count;
543
544 if(central->aborting)job->aborting=true; // Added by Anders
545
546 while (job->step(step_count)) {
547 if(job->aborting)central->aborting=true; // Added by Anders
548 JobTransfer* transfer = central->getTransfer();
549
550 if (transfer != NULL) {
551 transfer->setJob(job->getSubjob());
552 }
553
554 step_count = central->hasTransfer() ? 1 : context->step_count;
555 }
556
557 delete last_job;
558 last_job = job;
559 }
560
561 delete last_job;
562 }
563
564
565 // Do the actual work
work2(ThreadContext * context)566 void work2( ThreadContext* context )
567 {
568 Traverser* traverser = context->traverser;
569 JobCentral* central = context->central;
570 int step_count = context->step_count;
571 Job* job;
572 Job* last_job = new Job();
573
574 while ((job = central->getJob()) != NULL) {
575 job->setTraverser(traverser, last_job);
576
577 while (job->step(step_count)) {
578 JobTransfer* transfer = central->getTransfer();
579
580 if (transfer != NULL) {
581 transfer->setJob(job->getSubjob());
582 }
583 }
584
585 delete last_job;
586 last_job = job;
587 }
588
589 delete last_job;
590 }
591
592
traverse_threaded(Traverser ** traversers,int count,int step_count)593 void traverse_threaded( Traverser** traversers,
594 int count,
595 int step_count )
596 {
597 JobCentral* central = new JobCentral(traversers, count, step_count);
598
599 central->runJob(new Job(create_first_job_stack(traversers[0])));
600
601 delete central;
602 }
603 }
604