1 //
2 // Copyright (c) 2010-2017 Benjamin Kaufmann
3 //
4 // This file is part of Clasp. See http://www.cs.uni-potsdam.de/clasp/
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 // IN THE SOFTWARE.
23 //
24 #include <clasp/mt/parallel_solve.h>
25 #include <clasp/solver.h>
26 #include <clasp/clause.h>
27 #include <clasp/enumerator.h>
28 #include <clasp/util/timer.h>
29 #include <clasp/minimize_constraint.h>
30 #include <clasp/mt/mutex.h>
31 #include <potassco/string_convert.h>
32 namespace Clasp { namespace mt {
33 /////////////////////////////////////////////////////////////////////////////////////////
34 // BarrierSemaphore
35 /////////////////////////////////////////////////////////////////////////////////////////
36 // A combination of a barrier and a semaphore
37 class BarrierSemaphore {
38 public:
BarrierSemaphore(int counter=0,int maxParties=1)39 explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
40 // Initializes this object
41 // PRE: no thread is blocked on the semaphore
42 // (i.e. internal counter is >= 0)
43 // NOTE: not thread-safe
unsafe_init(int counter=0,int maxParties=1)44 void unsafe_init(int counter = 0, int maxParties = 1) {
45 counter_ = counter;
46 active_ = maxParties;
47 }
48 // Returns the current semaphore counter.
counter()49 int counter() { lock_guard<mutex> lock(semMutex_); return counter_; }
50 // Returns the number of parties required to trip this barrier.
parties()51 int parties() { lock_guard<mutex> lock(semMutex_); return active_; }
52 // Returns true if all parties are waiting at the barrier
active()53 bool active() { lock_guard<mutex> lock(semMutex_); return unsafe_active(); }
54
55 // barrier interface
56
57 // Increases the barrier count, i.e. the number of
58 // parties required to trip this barrier.
addParty()59 void addParty() {
60 lock_guard<mutex> lock(semMutex_);
61 ++active_;
62 }
63 // Decreases the barrier count and resets the barrier
64 // if reset is true.
65 // PRE: the thread does not itself wait on the barrier
removeParty(bool reset)66 int removeParty(bool reset) {
67 unique_lock<mutex> lock(semMutex_);
68 assert(active_ > 0);
69 int res = active_--;
70 if (reset) { unsafe_reset(0); }
71 else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
72 return res;
73 }
74 // Waits until all parties have arrived, i.e. called wait.
75 // Exactly one of the parties will receive a return value of true,
76 // the others will receive a value of false.
77 // Applications shall use this value to designate one thread as the
78 // leader that will eventually reset the barrier thereby unblocking the other threads.
wait()79 bool wait() {
80 unique_lock<mutex> lock(semMutex_);
81 if (--counter_ >= 0) { counter_ = -1; }
82 return unsafe_wait(lock);
83 }
84 // Resets the barrier and unblocks any waiting threads.
reset(uint32 semCount=0)85 void reset(uint32 semCount = 0) {
86 lock_guard<mutex> lock(semMutex_);
87 unsafe_reset(semCount);
88 }
89 // semaphore interface
90
91 // Decrement the semaphore's counter.
92 // If the counter is zero or less prior to the call
93 // the calling thread is suspended.
94 // Returns false to signal that all but the calling thread
95 // are currently blocked.
down()96 bool down() {
97 unique_lock<mutex> lock(semMutex_);
98 return down(lock);
99 }
100 // LOW-LEVEL version of down
down(unique_lock<mutex> & m)101 bool down(unique_lock<mutex>& m) {
102 assert(m.owns_lock());
103 if (--counter_ >= 0) { return true; }
104 return !unsafe_wait(m);
105 }
106 // Increments the semaphore's counter and resumes
107 // one thread which has called down() if the counter
108 // was less than zero prior to the call.
up()109 void up() {
110 unique_lock<mutex> lock(semMutex_);
111 up(lock);
112 }
113 // LOW-LEVEL version of up
up(unique_lock<mutex> & m,bool transferLock=true)114 void up(unique_lock<mutex>& m, bool transferLock = true) {
115 assert(m.owns_lock());
116 if (++counter_ < 1) {
117 if (transferLock) { m.unlock(); }
118 semCond_.notify_one();
119 }
120 }
toMutex()121 mutex& toMutex() { return semMutex_; }
122 private:
123 BarrierSemaphore(const BarrierSemaphore&);
124 BarrierSemaphore& operator=(const BarrierSemaphore&);
125 typedef condition_variable cv;
unsafe_active() const126 bool unsafe_active() const { return -counter_ >= active_; }
unsafe_reset(uint32 semCount)127 void unsafe_reset(uint32 semCount) {
128 int prev = counter_;
129 counter_ = semCount;
130 if (prev < 0) { semCond_.notify_all(); }
131 }
132 // Returns true for the leader, else false
unsafe_wait(unique_lock<mutex> & lock)133 bool unsafe_wait(unique_lock<mutex>& lock) {
134 assert(counter_ < 0);
135 // don't put the last thread to sleep!
136 if (!unsafe_active()) {
137 semCond_.wait(lock);
138 }
139 return unsafe_active();
140 }
141 cv semCond_; // waiting threads
142 mutex semMutex_; // mutex for updating counter
143 int counter_; // semaphore's counter
144 int active_; // number of active threads
145 };
146 /////////////////////////////////////////////////////////////////////////////////////////
147 // ParallelSolve::Impl
148 /////////////////////////////////////////////////////////////////////////////////////////
149 struct ParallelSolve::SharedData {
150 typedef PodQueue<const LitVec*> Queue;
151 enum MsgFlag {
152 terminate_flag = 1u, sync_flag = 2u, split_flag = 4u,
153 restart_flag = 8u, complete_flag = 16u,
154 interrupt_flag = 32u, // set on terminate from outside
155 allow_split_flag = 64u, // set if splitting mode is active
156 forbid_restart_flag = 128u,// set if restarts are no longer allowed
157 cancel_restart_flag = 256u,// set if current restart request was cancelled by some thread
158 restart_abandoned_flag= 512u,// set to signal that threads must not give up their gp
159 };
160 enum Message {
161 msg_terminate = (terminate_flag),
162 msg_interrupt = (terminate_flag | interrupt_flag),
163 msg_sync_restart = (sync_flag | restart_flag),
164 msg_split = split_flag
165 };
166 struct Generator {
167 enum State { start = 0, search = 1, model = 2, done = 3 };
GeneratorClasp::mt::ParallelSolve::SharedData::Generator168 Generator() : state(start) {}
169 mutex genM;
170 condition_variable cond;
waitWhileClasp::mt::ParallelSolve::SharedData::Generator171 State waitWhile(State st) {
172 State r;
173 for (unique_lock<mutex> lock(genM); (r = state) == st;) { cond.wait(lock); }
174 return r;
175 }
pushModelClasp::mt::ParallelSolve::SharedData::Generator176 void pushModel() {
177 notify(model);
178 waitWhile(model);
179 }
notifyClasp::mt::ParallelSolve::SharedData::Generator180 void notify(State st) {
181 unique_lock<mutex> lock(genM);
182 state = st;
183 cond.notify_one();
184 }
185 State state;
186 };
SharedDataClasp::mt::ParallelSolve::SharedData187 SharedData() : path(0) { reset(0); control = 0; }
resetClasp::mt::ParallelSolve::SharedData188 void reset(SharedContext* a_ctx) {
189 clearQueue();
190 syncT.reset();
191 workSem.unsafe_init(0, a_ctx ? a_ctx->concurrency() : 0);
192 msg.clear();
193 globalR.reset();
194 maxConflict = globalR.current();
195 errorSet = 0;
196 initVec = 0;
197 ctx = a_ctx;
198 path = 0;
199 nextId = 1;
200 workReq = 0;
201 restartReq = 0;
202 generator = 0;
203 errorCode = 0;
204 }
clearQueueClasp::mt::ParallelSolve::SharedData205 void clearQueue() {
206 while (!workQ.empty()) { delete workQ.pop_ret(); }
207 workQ.clear();
208 }
requestWorkClasp::mt::ParallelSolve::SharedData209 const LitVec* requestWork(const Solver& s) {
210 const uint64 m(uint64(1) << s.id());
211 if ((initVec & m) == m) {
212 if (!allowSplit()) {
213 // portfolio mode - all solvers can start with initial path
214 initVec -= m;
215 return path;
216 }
217 else if (initVec.exchange(0) != 0) {
218 // splitting mode - only one solver must start with initial path
219 return path;
220 }
221 }
222 if (!allowSplit()) { return 0; }
223 // try to get work from split
224 ctx->report(MessageEvent(s, "SPLIT", MessageEvent::sent));
225 const uint32 flags = uint32(terminate_flag) | uint32(sync_flag);
226 for (unique_lock<mutex> lock(workSem.toMutex());;) {
227 if (!workQ.empty()) {
228 const LitVec* res = workQ.pop_ret();
229 if (workQ.empty()) { workQ.clear(); }
230 return res;
231 }
232 postMessage(SharedData::msg_split, false);
233 if (!workSem.down(lock) || hasControl(flags)) {
234 return 0;
235 }
236 }
237 }
pushWorkClasp::mt::ParallelSolve::SharedData238 void pushWork(const LitVec* v) {
239 unique_lock<mutex> lock(workSem.toMutex());
240 workQ.push(v);
241 workSem.up(lock);
242 }
243 // MESSAGES
244 bool postMessage(Message m, bool notify);
hasMessageClasp::mt::ParallelSolve::SharedData245 bool hasMessage() const { return (control & uint32(7)) != 0; }
synchronizeClasp::mt::ParallelSolve::SharedData246 bool synchronize() const { return (control & uint32(sync_flag)) != 0; }
terminateClasp::mt::ParallelSolve::SharedData247 bool terminate() const { return (control & uint32(terminate_flag)) != 0; }
splitClasp::mt::ParallelSolve::SharedData248 bool split() const { return (control & uint32(split_flag)) != 0; }
aboutToSplitClasp::mt::ParallelSolve::SharedData249 void aboutToSplit() { if (--workReq == 0) updateSplitFlag(); }
250 void updateSplitFlag();
251 // CONTROL FLAGS
hasControlClasp::mt::ParallelSolve::SharedData252 bool hasControl(uint32 f) const { return (control & f) != 0; }
interruptClasp::mt::ParallelSolve::SharedData253 bool interrupt() const { return hasControl(interrupt_flag);}
completeClasp::mt::ParallelSolve::SharedData254 bool complete() const { return hasControl(complete_flag); }
restartClasp::mt::ParallelSolve::SharedData255 bool restart() const { return hasControl(restart_flag); }
allowSplitClasp::mt::ParallelSolve::SharedData256 bool allowSplit() const { return hasControl(allow_split_flag); }
allowRestartClasp::mt::ParallelSolve::SharedData257 bool allowRestart() const { return !hasControl(forbid_restart_flag); }
setControlClasp::mt::ParallelSolve::SharedData258 bool setControl(uint32 flags) { return (control.fetch_or(flags) & flags) != flags; }
clearControlClasp::mt::ParallelSolve::SharedData259 bool clearControl(uint32 flags) { return (control.fetch_and(~flags) & flags) == flags; }
260 typedef SingleOwnerPtr<Generator> GeneratorPtr;
261 Potassco::StringBuilder msg; // global error message
262 ScheduleStrategy globalR; // global restart strategy
263 uint64 maxConflict; // current restart limit
264 atomic<uint64> errorSet; // bitmask of erroneous solvers
265 SharedContext* ctx; // shared context object
266 const LitVec* path; // initial guiding path - typically empty
267 atomic<uint64> initVec; // vector of initial gp - represented as bitset
268 GeneratorPtr generator; // optional data for model generation
269 Timer<RealTime> syncT; // thread sync time
270 mutex modelM; // model-mutex
271 BarrierSemaphore workSem; // work-semaphore
272 Queue workQ; // work-queue (must be protected by workSem)
273 uint32 nextId; // next solver id to use
274 LowerBound lower_; // last reported lower bound (if any)
275 atomic<int> workReq; // > 0: someone needs work
276 atomic<uint32> restartReq; // == numThreads(): restart
277 atomic<uint32> control; // set of active message flags
278 atomic<uint32> modCount; // coounter for synchronizing models
279 uint32 errorCode; // global error code
280 };
281
282 // post message to all threads
postMessage(Message m,bool notifyWaiting)283 bool ParallelSolve::SharedData::postMessage(Message m, bool notifyWaiting) {
284 if (m == msg_split) {
285 if (++workReq == 1) { updateSplitFlag(); }
286 return true;
287 }
288 else if (setControl(m)) {
289 // control message - notify all if requested
290 if (notifyWaiting) workSem.reset();
291 if ((uint32(m) & uint32(sync_flag|terminate_flag)) != 0) {
292 syncT.reset();
293 syncT.start();
294 }
295 return true;
296 }
297 return false;
298 }
299
updateSplitFlag()300 void ParallelSolve::SharedData::updateSplitFlag() {
301 for (bool splitF;;) {
302 splitF = (workReq > 0);
303 if (split() == splitF) return;
304 if (splitF) control.fetch_or(uint32(split_flag));
305 else control.fetch_and(~uint32(split_flag));
306 }
307 }
308 /////////////////////////////////////////////////////////////////////////////////////////
309 // ParallelSolve
310 /////////////////////////////////////////////////////////////////////////////////////////
ParallelSolve(const ParallelSolveOptions & opts)311 ParallelSolve::ParallelSolve(const ParallelSolveOptions& opts)
312 : SolveAlgorithm(opts.limit)
313 , shared_(new SharedData)
314 , thread_(0)
315 , distribution_(opts.distribute)
316 , maxRestarts_(0)
317 , intGrace_(1024)
318 , intTopo_(opts.integrate.topo)
319 , intFlags_(ClauseCreator::clause_not_root_sat | ClauseCreator::clause_no_add)
320 , modeSplit_(opts.algorithm.mode == ParallelSolveOptions::Algorithm::mode_split) {
321 setRestarts(opts.restarts.maxR, opts.restarts.sched);
322 setIntegrate(opts.integrate.grace, opts.integrate.filter);
323 }
324
~ParallelSolve()325 ParallelSolve::~ParallelSolve() {
326 if (shared_->nextId > 1) {
327 // algorithm was not started but there may be active threads -
328 // force orderly shutdown
329 ParallelSolve::doInterrupt();
330 shared_->workSem.removeParty(true);
331 joinThreads();
332 }
333 destroyThread(masterId);
334 delete shared_;
335 }
336
beginSolve(SharedContext & ctx,const LitVec & path)337 bool ParallelSolve::beginSolve(SharedContext& ctx, const LitVec& path) {
338 assert(ctx.concurrency() && "Illegal number of threads");
339 if (shared_->terminate()) { return false; }
340 shared_->reset(&ctx);
341 if (!enumerator().supportsParallel() && numThreads() > 1) {
342 ctx.warn("Selected reasoning mode implies #Threads=1.");
343 shared_->workSem.unsafe_init(1);
344 modeSplit_ = false;
345 ctx.setConcurrency(1, SharedContext::resize_reserve);
346 }
347 shared_->setControl(modeSplit_ ? SharedData::allow_split_flag : SharedData::forbid_restart_flag);
348 shared_->modCount = uint32(enumerator().optimize());
349 shared_->path = &path;
350 if (distribution_.types != 0 && ctx.distributor.get() == 0 && numThreads() > 1) {
351 if (distribution_.mode == ParallelSolveOptions::Distribution::mode_local) {
352 ctx.distributor.reset(new mt::LocalDistribution(distribution_, ctx.concurrency(), intTopo_));
353 }
354 else {
355 ctx.distributor.reset(new mt::GlobalDistribution(distribution_, ctx.concurrency(), intTopo_));
356 }
357 }
358 shared_->setControl(SharedData::sync_flag); // force initial sync with all threads
359 shared_->syncT.start();
360 reportProgress(MessageEvent(*ctx.master(), "SYNC", MessageEvent::sent));
361 assert(ctx.master()->id() == masterId);
362 allocThread(masterId, *ctx.master());
363 for (uint32 i = 1; i != ctx.concurrency(); ++i) {
364 uint32 id = shared_->nextId++;
365 allocThread(id, *ctx.solver(id));
366 // start in new thread
367 Clasp::mt::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, id);
368 thread_[id]->setThread(x);
369 }
370 return true;
371 }
372
setIntegrate(uint32 grace,uint8 filter)373 void ParallelSolve::setIntegrate(uint32 grace, uint8 filter) {
374 typedef ParallelSolveOptions::Integration Dist;
375 intGrace_ = grace;
376 intFlags_ = ClauseCreator::clause_no_add;
377 if (filter == Dist::filter_heuristic) { store_set_bit(intFlags_, 31); }
378 if (filter != Dist::filter_no) { intFlags_ |= ClauseCreator::clause_not_root_sat; }
379 if (filter == Dist::filter_sat) { intFlags_ |= ClauseCreator::clause_not_sat; }
380 }
381
setRestarts(uint32 maxR,const ScheduleStrategy & rs)382 void ParallelSolve::setRestarts(uint32 maxR, const ScheduleStrategy& rs) {
383 maxRestarts_ = maxR;
384 shared_->globalR = maxR ? rs : ScheduleStrategy::none();
385 shared_->maxConflict = shared_->globalR.current();
386 }
387
numThreads() const388 uint32 ParallelSolve::numThreads() const { return shared_->workSem.parties(); }
389
allocThread(uint32 id,Solver & s)390 void ParallelSolve::allocThread(uint32 id, Solver& s) {
391 if (!thread_) {
392 uint32 n = numThreads();
393 thread_ = new ParallelHandler*[n];
394 std::fill(thread_, thread_+n, static_cast<ParallelHandler*>(0));
395 }
396 size_t sz = ((sizeof(ParallelHandler)+63) / 64) * 64;
397 thread_[id] = new (alignedAlloc(sz, 64)) ParallelHandler(*this, s);
398 }
399
destroyThread(uint32 id)400 void ParallelSolve::destroyThread(uint32 id) {
401 if (thread_ && thread_[id]) {
402 assert(!thread_[id]->joinable() && "Shutdown not completed!");
403 thread_[id]->~ParallelHandler();
404 alignedFree(thread_[id]);
405 thread_[id] = 0;
406 if (id == masterId) {
407 delete [] thread_;
408 thread_ = 0;
409 }
410 }
411 }
412
reportProgress(const Event & ev) const413 inline void ParallelSolve::reportProgress(const Event& ev) const {
414 return shared_->ctx->report(ev);
415 }
reportProgress(const Solver & s,const char * msg) const416 inline void ParallelSolve::reportProgress(const Solver& s, const char* msg) const {
417 return shared_->ctx->report(msg, &s);
418 }
419
420 // joins with and destroys all active threads
joinThreads()421 int ParallelSolve::joinThreads() {
422 uint32 winner = thread_[masterId]->winner() ? uint32(masterId) : UINT32_MAX;
423 // detach master only after all client threads are done
424 for (uint32 i = 1, end = shared_->nextId; i != end; ++i) {
425 thread_[i]->join();
426 if (thread_[i]->winner() && i < winner) {
427 winner = i;
428 }
429 Solver* s = &thread_[i]->solver();
430 reportProgress(*s, "joined");
431 destroyThread(i);
432 reportProgress(*s, "destroyed");
433 }
434 if (shared_->complete()) {
435 enumerator().commitComplete();
436 }
437 thread_[masterId]->handleTerminateMessage();
438 shared_->ctx->setWinner(winner);
439 shared_->nextId = 1;
440 shared_->syncT.stop();
441 reportProgress(MessageEvent(*shared_->ctx->master(), "TERMINATE", MessageEvent::completed, shared_->syncT.total()));
442 return !shared_->interrupt() ? thread_[masterId]->error() : shared_->errorCode;
443 }
444
doStart(SharedContext & ctx,const LitVec & assume)445 void ParallelSolve::doStart(SharedContext& ctx, const LitVec& assume) {
446 if (beginSolve(ctx, assume)) {
447 // start computation in new thread
448 shared_->generator = new SharedData::Generator();
449 Clasp::mt::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, static_cast<uint32>(masterId));
450 thread_[masterId]->setThread(x);
451 }
452 }
doNext(int)453 int ParallelSolve::doNext(int) {
454 POTASSCO_REQUIRE(shared_->generator.get(), "Invalid operation");
455 int s = shared_->generator->state;
456 if (s != SharedData::Generator::done) {
457 shared_->generator->notify(SharedData::Generator::search);
458 if ((s = shared_->generator->waitWhile(SharedData::Generator::search)) == SharedData::Generator::model) {
459 return value_true;
460 }
461 }
462 return shared_->complete() ? value_false : value_free;
463 }
doStop()464 void ParallelSolve::doStop() {
465 if (shared_->nextId <= 1) { return; }
466 reportProgress(*shared_->ctx->master(), "joining with other threads");
467 if (shared_->generator.get()) {
468 shared_->setControl(SharedData::terminate_flag);
469 shared_->generator->notify(SharedData::Generator::done);
470 thread_[masterId]->join();
471 }
472 int err = joinThreads();
473 shared_->generator = 0;
474 shared_->ctx->distributor.reset(0);
475 switch(err) {
476 case 0: break;
477 case LogicError: throw std::logic_error(shared_->msg.c_str());
478 case RuntimeError: throw std::runtime_error(shared_->msg.c_str());
479 case OutOfMemory: throw std::bad_alloc();
480 default: throw std::runtime_error(shared_->msg.c_str());
481 }
482 }
483
doDetach()484 void ParallelSolve::doDetach() {
485 // detach master only after all client threads are done
486 thread_[masterId]->detach(*shared_->ctx, shared_->interrupt());
487 destroyThread(masterId);
488 }
489
490 // Entry point for master solver
doSolve(SharedContext & ctx,const LitVec & path)491 bool ParallelSolve::doSolve(SharedContext& ctx, const LitVec& path) {
492 if (beginSolve(ctx, path)) {
493 solveParallel(masterId);
494 ParallelSolve::doStop();
495 }
496 return !shared_->complete();
497 }
498
499 // main solve loop executed by all threads
solveParallel(uint32 id)500 void ParallelSolve::solveParallel(uint32 id) {
501 Solver& s = thread_[id]->solver();
502 SolverStats agg;
503 PathPtr a(0);
504 if (id == masterId && shared_->generator.get()) {
505 shared_->generator->waitWhile(SharedData::Generator::start);
506 }
507 try {
508 // establish solver<->handler connection and attach to shared context
509 // should this fail because of an initial conflict, we'll terminate
510 // in requestWork.
511 thread_[id]->attach(*shared_->ctx);
512 BasicSolve solve(s, limits());
513 agg.enable(s.stats);
514 for (GpType t; requestWork(s, a);) {
515 agg.accu(s.stats);
516 s.stats.reset();
517 thread_[id]->setGpType(t = ((a.is_owner() || modeSplit_) ? gp_split : gp_fixed));
518 if (enumerator().start(s, *a, a.is_owner()) && thread_[id]->solveGP(solve, t, shared_->maxConflict) == value_free) {
519 terminate(s, false);
520 }
521 s.clearStopConflict();
522 s.undoUntil(0);
523 enumerator().end(s);
524 }
525 }
526 catch (const std::bad_alloc&) { exception(id, a, OutOfMemory, "bad alloc"); }
527 catch (const std::logic_error& e) { exception(id, a, LogicError, e.what()); }
528 catch (const std::runtime_error& e) { exception(id, a, RuntimeError, e.what()); }
529 catch (const std::exception& e) { exception(id, a, RuntimeError, e.what()); }
530 catch (...) { exception(id, a, UnknownError, "unknown"); }
531 assert(shared_->terminate() || thread_[id]->error());
532 // this thread is leaving
533 int active = shared_->workSem.removeParty(shared_->terminate());
534 // update stats
535 s.stats.accu(agg);
536 if (id != masterId) {
537 // remove solver<->handler connection and detach from shared context
538 // note: since detach can change the problem, we must not yet detach the master
539 // because some client might still be in the middle of an attach operation
540 thread_[id]->detach(*shared_->ctx, shared_->interrupt());
541 s.stats.addCpuTime(ThreadTime::getTime());
542 }
543 if (active == 1 && shared_->generator.get()) {
544 shared_->generator->notify(SharedData::Generator::done);
545 }
546 }
547
exception(uint32 id,PathPtr & path,ErrorCode e,const char * what)548 void ParallelSolve::exception(uint32 id, PathPtr& path, ErrorCode e, const char* what) {
549 try {
550 if (!thread_[id]->setError(e) || e != OutOfMemory || shared_->workSem.active()) {
551 ParallelSolve::doInterrupt();
552 if (shared_->errorSet.fetch_or(bit_mask<uint64>(id)) == 0) {
553 shared_->errorCode = e;
554 shared_->msg.appendFormat("[%u]: %s", id, what);
555 }
556 }
557 else if (path.get() && shared_->allowSplit()) {
558 shared_->workQ.push(path.release());
559 shared_->workSem.up();
560 }
561 reportProgress(thread_[id]->solver(), e == OutOfMemory ? "Thread failed with out of memory" : "Thread failed with error");
562 }
563 catch(...) { ParallelSolve::doInterrupt(); }
564 }
565
566 // forced termination from outside
doInterrupt()567 bool ParallelSolve::doInterrupt() {
568 // do not notify blocked threads to avoid possible
569 // deadlock in semaphore!
570 shared_->postMessage(SharedData::msg_interrupt, false);
571 return true;
572 }
573
574 // tries to get new work for the given solver
requestWork(Solver & s,PathPtr & out)575 bool ParallelSolve::requestWork(Solver& s, PathPtr& out) {
576 const LitVec* a = 0;
577 for (int popped = 0; !shared_->terminate();) {
578 // only clear path and stop conflict - we don't propagate() here
579 // because we would then have to handle any eventual conflicts
580 if (++popped == 1 && !s.popRootLevel(s.rootLevel())) {
581 // s has a real top-level conflict - problem is unsat
582 terminate(s, true);
583 }
584 else if (shared_->synchronize()) {
585 // a synchronize request is active - we are fine with
586 // this but did not yet had a chance to react on it
587 waitOnSync(s);
588 }
589 else if (a || (a = shared_->requestWork(s)) != 0) {
590 assert(s.decisionLevel() == 0);
591 // got new work from work-queue
592 out = a;
593 // do not take over ownership of initial gp!
594 if (a == shared_->path) { out.release(); }
595 // propagate any new facts before starting new work
596 if (s.simplify()) { return true; }
597 // s now has a conflict - either an artifical stop conflict
598 // or a real conflict - we'll handle it in the next iteration
599 // via the call to popRootLevel()
600 popped = 0;
601 }
602 else if (!shared_->allowSplit() || !shared_->synchronize()) {
603 // no work left - quitting time?
604 terminate(s, true);
605 }
606 }
607 return false;
608 }
609
610 // terminated from inside of algorithm
611 // check if there is more to do
terminate(Solver & s,bool complete)612 void ParallelSolve::terminate(Solver& s, bool complete) {
613 if (!shared_->terminate()) {
614 if (enumerator().tentative() && complete) {
615 if (shared_->setControl(SharedData::sync_flag|SharedData::complete_flag)) {
616 thread_[s.id()]->setWinner();
617 reportProgress(MessageEvent(s, "SYNC", MessageEvent::sent));
618 }
619 }
620 else {
621 reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::sent));
622 shared_->postMessage(SharedData::msg_terminate, true);
623 thread_[s.id()]->setWinner();
624 if (complete) { shared_->setControl(SharedData::complete_flag); }
625 }
626 }
627 }
628
629 // handles an active synchronization request
630 // returns true to signal that s should restart otherwise false
waitOnSync(Solver & s)631 bool ParallelSolve::waitOnSync(Solver& s) {
632 if (!thread_[s.id()]->handleRestartMessage()) {
633 shared_->setControl(SharedData::cancel_restart_flag);
634 }
635 bool hasPath = thread_[s.id()]->hasPath();
636 bool tentative= enumerator().tentative();
637 if (shared_->workSem.wait()) {
638 // last man standing - complete synchronization request
639 shared_->workReq = 0;
640 shared_->restartReq = 0;
641 bool restart = shared_->hasControl(SharedData::restart_flag);
642 bool init = true;
643 if (restart) {
644 init = shared_->allowRestart() && !shared_->hasControl(SharedData::cancel_restart_flag);
645 if (init) { shared_->globalR.next(); }
646 shared_->maxConflict = shared_->allowRestart() && shared_->globalR.idx < maxRestarts_
647 ? shared_->globalR.current()
648 : UINT64_MAX;
649 }
650 else if (shared_->maxConflict != UINT64_MAX && !shared_->allowRestart()) {
651 shared_->maxConflict = UINT64_MAX;
652 }
653 if (init) { initQueue(); }
654 else { shared_->setControl(SharedData::restart_abandoned_flag); }
655 if (tentative && shared_->complete()) {
656 if (enumerator().commitComplete()) { shared_->setControl(SharedData::terminate_flag); }
657 else { shared_->modCount = uint32(0); shared_->clearControl(SharedData::complete_flag); }
658 }
659 shared_->clearControl(SharedData::msg_split | SharedData::msg_sync_restart | SharedData::restart_abandoned_flag | SharedData::cancel_restart_flag);
660 shared_->syncT.lap();
661 reportProgress(MessageEvent(s, "SYNC", MessageEvent::completed, shared_->syncT.elapsed()));
662 // wake up all blocked threads
663 shared_->workSem.reset();
664 }
665 return shared_->terminate() || (hasPath && !shared_->hasControl(SharedData::restart_abandoned_flag));
666 }
667
668 // If guiding path scheme is active only one
669 // thread can start with gp (typically empty) and this
670 // thread must then split up search-space dynamically.
671 // Otherwise, all threads can start with initial gp
672 // TODO:
673 // heuristic for initial splits?
initQueue()674 void ParallelSolve::initQueue() {
675 shared_->clearQueue();
676 if (shared_->allowSplit() && modeSplit_ && !enumerator().supportsSplitting(*shared_->ctx)) {
677 shared_->ctx->warn("Selected strategies imply Mode=compete.");
678 shared_->clearControl(SharedData::allow_split_flag);
679 shared_->setControl(SharedData::forbid_restart_flag);
680 modeSplit_ = false;
681 }
682 shared_->initVec = UINT64_MAX;
683 assert(shared_->allowSplit() || shared_->hasControl(SharedData::forbid_restart_flag));
684 }
685
686 // adds work to the work-queue
pushWork(LitVec * v)687 void ParallelSolve::pushWork(LitVec* v) {
688 assert(v);
689 shared_->pushWork(v);
690 }
691
692 // called whenever some solver proved unsat
commitUnsat(Solver & s)693 bool ParallelSolve::commitUnsat(Solver& s) {
694 const int supUnsat = enumerator().unsatType();
695 if (supUnsat == Enumerator::unsat_stop || shared_->terminate() || shared_->synchronize()) {
696 return false;
697 }
698 unique_lock<mutex> lock(shared_->modelM, defer_lock_t());
699 if (supUnsat == Enumerator::unsat_sync) {
700 lock.lock();
701 }
702 bool result = enumerator().commitUnsat(s);
703 if (lock.owns_lock()) {
704 lock.unlock();
705 }
706 if (!thread_[s.id()]->disjointPath()) {
707 if (result) {
708 ++shared_->modCount;
709 if (s.lower.bound > 0) {
710 lock.lock();
711 if (s.lower.bound > shared_->lower_.bound || s.lower.level > shared_->lower_.level) {
712 shared_->lower_ = s.lower;
713 reportUnsat(s);
714 ++shared_->modCount;
715 }
716 lock.unlock();
717 }
718 }
719 else { terminate(s, true); }
720 }
721 return result;
722 }
723
724 // called whenever some solver has found a model
commitModel(Solver & s)725 bool ParallelSolve::commitModel(Solver& s) {
726 // grab lock - models must be processed sequentially
727 // in order to simplify printing and to avoid duplicates
728 // in all non-trivial enumeration modes
729 bool stop = false;
730 {lock_guard<mutex> lock(shared_->modelM);
731 // first check if the model is still valid once all
732 // information is integrated into the solver
733 if (thread_[s.id()]->isModelLocked(s) && (stop=shared_->terminate()) == false && enumerator().commitModel(s)) {
734 if (enumerator().lastModel().num == 1 && !enumerator().supportsRestarts()) {
735 // switch to backtracking based splitting algorithm
736 // the solver's gp will act as the root for splitting and is
737 // from now on disjoint from all other gps
738 shared_->setControl(SharedData::forbid_restart_flag | SharedData::allow_split_flag);
739 thread_[s.id()]->setGpType(gp_split);
740 enumerator().setDisjoint(s, true);
741 }
742 if (shared_->generator.get()) {
743 shared_->generator->pushModel();
744 }
745 else if ((stop = !reportModel(s)) == true) {
746 // must be called while holding the lock - otherwise
747 // we have a race condition with solvers that
748 // are currently blocking on the mutex and we could enumerate
749 // more models than requested by the user
750 terminate(s, !moreModels(s));
751 }
752 ++shared_->modCount;
753 }}
754 return !stop;
755 }
756
hasErrors() const757 uint64 ParallelSolve::hasErrors() const {
758 return shared_->errorSet != 0u;
759 }
interrupted() const760 bool ParallelSolve::interrupted() const {
761 return shared_->interrupt();
762 }
resetSolve()763 void ParallelSolve::resetSolve() {
764 shared_->control = 0;
765 }
enableInterrupts()766 void ParallelSolve::enableInterrupts() {}
767 // updates s with new messages and uses s to create a new guiding path
768 // if necessary and possible
handleMessages(Solver & s)769 bool ParallelSolve::handleMessages(Solver& s) {
770 // check if there are new messages for s
771 if (!shared_->hasMessage()) {
772 // nothing to do
773 return true;
774 }
775 ParallelHandler* h = thread_[s.id()];
776 if (shared_->terminate()) {
777 reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::received));
778 h->handleTerminateMessage();
779 s.setStopConflict();
780 return false;
781 }
782 if (shared_->synchronize()) {
783 reportProgress(MessageEvent(s, "SYNC", MessageEvent::received));
784 if (waitOnSync(s)) {
785 s.setStopConflict();
786 return false;
787 }
788 return true;
789 }
790 if (h->disjointPath() && s.splittable() && shared_->workReq > 0) {
791 // First declare split request as handled
792 // and only then do the actual split.
793 // This way, we minimize the chance for
794 // "over"-splitting, i.e. one split request handled
795 // by more than one thread.
796 shared_->aboutToSplit();
797 reportProgress(MessageEvent(s, "SPLIT", MessageEvent::received));
798 h->handleSplitMessage();
799 enumerator().setDisjoint(s, true);
800 }
801 return true;
802 }
803
integrateModels(Solver & s,uint32 & upCount)804 bool ParallelSolve::integrateModels(Solver& s, uint32& upCount) {
805 uint32 gCount = shared_->modCount;
806 return gCount == upCount || (enumerator().update(s) && (upCount = gCount) == gCount);
807 }
808
requestRestart()809 void ParallelSolve::requestRestart() {
810 if (shared_->allowRestart() && ++shared_->restartReq == numThreads()) {
811 shared_->postMessage(SharedData::msg_sync_restart, true);
812 }
813 }
814
createSolveObject() const815 SolveAlgorithm* ParallelSolveOptions::createSolveObject() const {
816 return numSolver() > 1 ? new ParallelSolve(*this) : BasicSolveOptions::createSolveObject();
817 }
818 ////////////////////////////////////////////////////////////////////////////////////
819 // ParallelHandler
820 /////////////////////////////////////////////////////////////////////////////////////////
ParallelHandler(ParallelSolve & ctrl,Solver & s)821 ParallelHandler::ParallelHandler(ParallelSolve& ctrl, Solver& s)
822 : MessageHandler()
823 , ctrl_(&ctrl)
824 , solver_(&s)
825 , received_(0)
826 , recEnd_(0)
827 , intEnd_(0)
828 , error_(0)
829 , win_(0)
830 , up_(0) {
831 this->next = this;
832 }
833
~ParallelHandler()834 ParallelHandler::~ParallelHandler() { clearDB(0); delete [] received_; }
835
836 // adds this as post propagator to its solver and attaches the solver to ctx.
attach(SharedContext & ctx)837 bool ParallelHandler::attach(SharedContext& ctx) {
838 assert(solver_);
839 gp_.reset();
840 error_ = 0;
841 win_ = 0;
842 up_ = 0;
843 act_ = 0;
844 lbd_ = solver_->searchConfig().reduce.strategy.glue != 0;
845 next = 0;
846 if (!received_ && ctx.distributor.get()) {
847 received_ = new SharedLiterals*[RECEIVE_BUFFER_SIZE];
848 }
849 ctx.report("attach", solver_);
850 solver_->addPost(this);
851 return ctx.attach(solver_->id());
852 }
853
854 // removes this from the list of post propagators of its solver and detaches the solver from ctx.
detach(SharedContext & ctx,bool)855 void ParallelHandler::detach(SharedContext& ctx, bool) {
856 handleTerminateMessage();
857 ctx.report("detach", solver_);
858 if (solver_->sharedContext() == &ctx) {
859 clearDB(!error() ? solver_ : 0);
860 ctx.report("detached db", solver_);
861 ctx.detach(*solver_, error() != 0);
862 ctx.report("detached ctx", solver_);
863 }
864 }
865
setError(int code)866 bool ParallelHandler::setError(int code) {
867 error_ = code;
868 return thread_.joinable() && !winner();
869 }
870
clearDB(Solver * s)871 void ParallelHandler::clearDB(Solver* s) {
872 for (ClauseDB::iterator it = integrated_.begin(), end = integrated_.end(); it != end; ++it) {
873 ClauseHead* c = static_cast<ClauseHead*>(*it);
874 if (s) { s->addLearnt(c, c->size(), Constraint_t::Other); }
875 else { c->destroy(); }
876 }
877 integrated_.clear();
878 intEnd_= 0;
879 for (uint32 i = 0; i != recEnd_; ++i) { received_[i]->release(); }
880 recEnd_= 0;
881 }
882
solveGP(BasicSolve & solve,GpType t,uint64 restart)883 ValueRep ParallelHandler::solveGP(BasicSolve& solve, GpType t, uint64 restart) {
884 ValueRep res = value_free;
885 Solver& s = solve.solver();
886 bool fin = false;
887 gp_.reset(restart, t);
888 assert(act_ == 0);
889 do {
890 win_ = 0;
891 ctrl_->integrateModels(s, gp_.modCount);
892 up_ = act_ = 1; // activate enumerator and bounds
893 res = solve.solve();
894 up_ = act_ = 0; // de-activate enumerator and bounds
895 fin = true;
896 if (res == value_true) { if (ctrl_->commitModel(s)) { fin = false; } }
897 else if (res == value_false) { if (ctrl_->commitUnsat(s)) { fin = false; gp_.reset(restart, gp_.type); } }
898 } while (!fin);
899 return res;
900 }
901
902 // detach from solver, i.e. ignore any further messages
handleTerminateMessage()903 void ParallelHandler::handleTerminateMessage() {
904 if (this->next != this) {
905 // mark removed propagator by creating "self-loop"
906 solver_->removePost(this);
907 this->next = this;
908 }
909 }
910
911 // split-off new guiding path and add it to solve object
handleSplitMessage()912 void ParallelHandler::handleSplitMessage() {
913 assert(solver_ && "ParallelHandler::handleSplitMessage(): not attached!");
914 assert(solver_->splittable());
915 Solver& s = *solver_;
916 SingleOwnerPtr<LitVec> newPath(new LitVec());
917 s.split(*newPath);
918 ctrl_->pushWork(newPath.release());
919 }
920
handleRestartMessage()921 bool ParallelHandler::handleRestartMessage() {
922 // TODO
923 // we may want to implement some heuristic, like
924 // computing a local var order.
925 return true;
926 }
927
simplify(Solver & s,bool sh)928 bool ParallelHandler::simplify(Solver& s, bool sh) {
929 ClauseDB::size_type i, j, end = integrated_.size();
930 for (i = j = 0; i != end; ++i) {
931 Constraint* c = integrated_[i];
932 if (c->simplify(s, sh)) {
933 c->destroy(&s, false);
934 intEnd_ -= (i < intEnd_);
935 }
936 else {
937 integrated_[j++] = c;
938 }
939 }
940 shrinkVecTo(integrated_, j);
941 if (intEnd_ > sizeVec(integrated_)) intEnd_ = sizeVec(integrated_);
942 return false;
943 }
944
propagateFixpoint(Solver & s,PostPropagator * ctx)945 bool ParallelHandler::propagateFixpoint(Solver& s, PostPropagator* ctx) {
946 // Check for messages and integrate any new information from
947 // models/lemma exchange but only if path is setup.
948 // Skip updates if called from other post propagator so that we do not
949 // disturb any active propagation.
950 if (int up = (ctx == 0 && up_ != 0)) {
951 up_ ^= (uint32)s.updateMode();
952 up += (act_ == 0 || (up_ && (s.stats.choices & 63) != 0));
953 if (s.stats.conflicts >= gp_.restart) { ctrl_->requestRestart(); gp_.restart *= 2; }
954 for (uint32 cDL = s.decisionLevel();;) {
955 bool ok = ctrl_->handleMessages(s) && (up > 1 ? integrate(s) : ctrl_->integrateModels(s, gp_.modCount));
956 if (!ok) { return false; }
957 if (cDL != s.decisionLevel()) { // cancel active propagation on cDL
958 cancelPropagation();
959 cDL = s.decisionLevel();
960 }
961 if (!s.queueSize()) { if (++up == 3) return true; }
962 else if (!s.propagateUntil(this)){ return false; }
963 }
964 }
965 return ctrl_->handleMessages(s);
966 }
967
968 // checks whether s still has a model once all
969 // information from previously found models were integrated
isModel(Solver & s)970 bool ParallelHandler::isModel(Solver& s) {
971 assert(s.numFreeVars() == 0);
972 // either no unprocessed updates or still a model after
973 // updates were integrated
974 return ctrl_->integrateModels(s, gp_.modCount)
975 && s.numFreeVars() == 0
976 && s.queueSize() == 0;
977 }
978
isModelLocked(Solver & s)979 bool ParallelHandler::isModelLocked(Solver& s) {
980 const uint32 current = gp_.modCount;
981 if (!isModel(s))
982 return false;
983 if (current == gp_.modCount)
984 return true;
985 for (PostPropagator* p = s.getPost(PostPropagator::priority_class_general); p; p = p->next) {
986 if (!p->isModel(s))
987 return false;
988 }
989 return true;
990 }
991
integrate(Solver & s)992 bool ParallelHandler::integrate(Solver& s) {
993 uint32 rec = recEnd_ + s.receive(received_ + recEnd_, RECEIVE_BUFFER_SIZE - recEnd_);
994 if (!rec) { return true; }
995 ClauseCreator::Result ret;
996 uint32 dl = s.decisionLevel(), added = 0, i = 0;
997 uint32 intFlags = ctrl_->integrateFlags();
998 recEnd_ = 0;
999 if (lbd_) {
1000 intFlags |= ClauseCreator::clause_int_lbd;
1001 }
1002 do {
1003 ret = ClauseCreator::integrate(s, received_[i++], intFlags, Constraint_t::Other);
1004 added += ret.status != ClauseCreator::status_subsumed;
1005 if (ret.local) { add(ret.local); }
1006 if (ret.unit()){ s.stats.addIntegratedAsserting(dl, s.decisionLevel()); dl = s.decisionLevel(); }
1007 if (!ret.ok()) { while (i != rec) { received_[recEnd_++] = received_[i++]; } }
1008 } while (i != rec);
1009 s.stats.addIntegrated(added);
1010 return !s.hasConflict();
1011 }
1012
add(ClauseHead * h)1013 void ParallelHandler::add(ClauseHead* h) {
1014 if (intEnd_ < integrated_.size()) {
1015 ClauseHead* o = (ClauseHead*)integrated_[intEnd_];
1016 integrated_[intEnd_] = h;
1017 assert(o);
1018 if (!ctrl_->integrateUseHeuristic() || o->locked(*solver_) || o->activity().activity() > 0) {
1019 solver_->addLearnt(o, o->size(), Constraint_t::Other);
1020 }
1021 else {
1022 o->destroy(solver_, true);
1023 solver_->stats.removeIntegrated();
1024 }
1025 }
1026 else {
1027 integrated_.push_back(h);
1028 }
1029 if (++intEnd_ >= ctrl_->integrateGrace()) {
1030 intEnd_ = 0;
1031 }
1032 }
1033 /////////////////////////////////////////////////////////////////////////////////////////
1034 // Distribution
1035 /////////////////////////////////////////////////////////////////////////////////////////
initPeerMask(uint32 id,Integration::Topology topo,uint32 maxT)1036 uint64 ParallelSolveOptions::initPeerMask(uint32 id, Integration::Topology topo, uint32 maxT) {
1037 if (topo == Integration::topo_all) { return Distributor::initSet(maxT) ^ Distributor::mask(id); }
1038 if (topo == Integration::topo_ring){
1039 uint32 prev = id > 0 ? id - 1 : maxT - 1;
1040 uint32 next = (id + 1) % maxT;
1041 return Distributor::mask(prev) | Distributor::mask(next);
1042 }
1043 bool ext = topo == Integration::topo_cubex;
1044 uint32 n = maxT;
1045 uint32 k = 1;
1046 for (uint32 i = n / 2; i > 0; i /= 2, k *= 2) { }
1047 uint64 res = 0, x = 1;
1048 for (uint32 m = 1; m <= k; m *= 2) {
1049 uint32 i = m ^ id;
1050 if (i < n) { res |= (x << i); }
1051 else if (ext && k != m) { res |= (x << (i^k)); }
1052 }
1053 if (ext) {
1054 uint32 s = k ^ id;
1055 for(uint32 m = 1; m < k && s >= n; m *= 2) {
1056 uint32 i = m ^ s;
1057 if (i < n) { res |= (x << i); }
1058 }
1059 }
1060 assert( (res & (x<<id)) == 0 );
1061 return res;
1062 }
1063 /////////////////////////////////////////////////////////////////////////////////////////
1064 // GlobalDistribution
1065 /////////////////////////////////////////////////////////////////////////////////////////
GlobalDistribution(const Policy & p,uint32 maxT,uint32 topo)1066 GlobalDistribution::GlobalDistribution(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), queue_(0) {
1067 typedef ParallelSolveOptions::Integration::Topology Topology;
1068 assert(maxT <= ParallelSolveOptions::supportedSolvers());
1069 Topology t = static_cast<Topology>(topo);
1070 queue_ = new Queue(maxT);
1071 threadId_ = (ThreadInfo*)alignedAlloc((maxT * sizeof(ThreadInfo)), 64);
1072 for (uint32 i = 0; i != maxT; ++i) {
1073 new (&threadId_[i]) ThreadInfo;
1074 threadId_[i].id = queue_->addThread();
1075 threadId_[i].peerMask = ParallelSolveOptions::initPeerMask(i, t, maxT);
1076 }
1077 }
~GlobalDistribution()1078 GlobalDistribution::~GlobalDistribution() {
1079 static_assert(sizeof(ThreadInfo) == 64, "Invalid size");
1080 release();
1081 }
release()1082 void GlobalDistribution::release() {
1083 if (queue_) {
1084 for (uint32 i = 0; i != queue_->maxThreads(); ++i) {
1085 Queue::ThreadId& id = getThreadId(i);
1086 for (ClausePair n; queue_->tryConsume(id, n); ) {
1087 if (n.sender != i) { n.lits->release(); }
1088 }
1089 threadId_[i].~ThreadInfo();
1090 }
1091 delete queue_;
1092 queue_ = 0;
1093 alignedFree(threadId_);
1094 }
1095 }
publish(const Solver & s,SharedLiterals * n)1096 void GlobalDistribution::publish(const Solver& s, SharedLiterals* n) {
1097 assert(n->refCount() >= (queue_->maxThreads()-1));
1098 queue_->publish(ClausePair(s.id(), n), getThreadId(s.id()));
1099 }
receive(const Solver & in,SharedLiterals ** out,uint32 maxn)1100 uint32 GlobalDistribution::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
1101 uint32 r = 0;
1102 Queue::ThreadId& id = getThreadId(in.id());
1103 uint64 peers = getPeerMask(in.id());
1104 for (ClausePair n; r != maxn && queue_->tryConsume(id, n); ) {
1105 if (n.sender != in.id()) {
1106 if (inSet(peers, n.sender)) { out[r++] = n.lits; }
1107 else if (n.lits->size() == 1){ out[r++] = n.lits; }
1108 else { n.lits->release(); }
1109 }
1110 }
1111 return r;
1112 }
1113
1114 /////////////////////////////////////////////////////////////////////////////////////////
1115 // LocalDistribution
1116 /////////////////////////////////////////////////////////////////////////////////////////
LocalDistribution(const Policy & p,uint32 maxT,uint32 topo)1117 LocalDistribution::LocalDistribution(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), thread_(0), numThread_(0) {
1118 typedef ParallelSolveOptions::Integration::Topology Topology;
1119 assert(maxT <= ParallelSolveOptions::supportedSolvers());
1120 Topology t = static_cast<Topology>(topo);
1121 thread_ = new ThreadData*[numThread_ = maxT];
1122 size_t sz = ((sizeof(ThreadData) + 63) / 64) * 64;
1123 for (uint32 i = 0; i != maxT; ++i) {
1124 ThreadData* ti = new (alignedAlloc(sz, 64)) ThreadData;
1125 ti->received.init(&ti->sentinal);
1126 ti->peers = ParallelSolveOptions::initPeerMask(i, t, maxT);
1127 ti->free = 0;
1128 thread_[i]= ti;
1129 }
1130 }
~LocalDistribution()1131 LocalDistribution::~LocalDistribution() {
1132 while (numThread_) {
1133 ThreadData* ti = thread_[--numThread_];
1134 thread_[numThread_] = 0;
1135 for (QNode* n; (n = ti->received.pop()) != 0; ) {
1136 static_cast<SharedLiterals*>(n->data)->release();
1137 }
1138 ti->~ThreadData();
1139 alignedFree(ti);
1140 }
1141 for (MPSCPtrQueue::RawNode* n; (n = blocks_.tryPop()) != 0; ) {
1142 alignedFree(n);
1143 }
1144 delete [] thread_;
1145 }
1146
freeNode(uint32 tId,QNode * n) const1147 void LocalDistribution::freeNode(uint32 tId, QNode* n) const {
1148 if (n != &thread_[tId]->sentinal) {
1149 n->next = thread_[tId]->free;
1150 thread_[tId]->free = n;
1151 }
1152 }
1153
allocNode(uint32 tId,SharedLiterals * clause)1154 LocalDistribution::QNode* LocalDistribution::allocNode(uint32 tId, SharedLiterals* clause) {
1155 for (ThreadData* td = thread_[tId];;) {
1156 if (QNode* n = td->free) {
1157 td->free = static_cast<QNode*>(static_cast<MPSCPtrQueue::RawNode*>(n->next));
1158 n->data = clause;
1159 return n;
1160 }
1161 // alloc a new block of node;
1162 const uint32 nNodes = 128;
1163 QNode* raw = (QNode*)alignedAlloc(sizeof(QNode) * nNodes, 64);
1164 // add nodes [1, nNodes) to free list
1165 for (uint32 i = 1; i != nNodes-1; ++i) {
1166 raw[i].next = &raw[i+1];
1167 }
1168 raw[nNodes-1].next = 0;
1169 td->free = &raw[1];
1170 // use first node to link to block list
1171 blocks_.push(raw);
1172 }
1173 }
1174
publish(const Solver & s,SharedLiterals * n)1175 void LocalDistribution::publish(const Solver& s, SharedLiterals* n) {
1176 assert(n->refCount() >= (numThread_-1));
1177 uint32 sender = s.id();
1178 uint32 size = n->size();
1179 uint32 decRef = 0;
1180 for (uint32 i = 0; i != numThread_; ++i) {
1181 if (i == sender) { continue; }
1182 if (size > 1 && !inSet(thread_[i]->peers, sender)) { ++decRef; }
1183 else {
1184 QNode* node = allocNode(sender, n);
1185 thread_[i]->received.push(node);
1186 }
1187 }
1188 if (decRef) { n->release(decRef); }
1189 }
receive(const Solver & in,SharedLiterals ** out,uint32 maxn)1190 uint32 LocalDistribution::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
1191 MPSCPtrQueue& q = thread_[in.id()]->received;
1192 QNode* n = 0;
1193 uint32 r = 0;
1194 while (r != maxn && (n = q.pop()) != 0) {
1195 out[r++] = static_cast<SharedLiterals*>(n->data);
1196 freeNode(in.id(), n);
1197 }
1198 return r;
1199 }
1200
1201 } } // namespace Clasp::mt
1202
1203