1 //
2 // Copyright (c) 2010-2017 Benjamin Kaufmann
3 //
4 // This file is part of Clasp. See http://www.cs.uni-potsdam.de/clasp/
5 //
6 // Permission is hereby granted, free of charge, to any person obtaining a copy
7 // of this software and associated documentation files (the "Software"), to
8 // deal in the Software without restriction, including without limitation the
9 // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
10 // sell copies of the Software, and to permit persons to whom the Software is
11 // furnished to do so, subject to the following conditions:
12 //
13 // The above copyright notice and this permission notice shall be included in
14 // all copies or substantial portions of the Software.
15 //
16 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
21 // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
22 // IN THE SOFTWARE.
23 //
24 #include <clasp/mt/parallel_solve.h>
25 #include <clasp/solver.h>
26 #include <clasp/clause.h>
27 #include <clasp/enumerator.h>
28 #include <clasp/util/timer.h>
29 #include <clasp/minimize_constraint.h>
30 #include <clasp/mt/mutex.h>
31 #include <potassco/string_convert.h>
32 namespace Clasp { namespace mt {
33 /////////////////////////////////////////////////////////////////////////////////////////
34 // BarrierSemaphore
35 /////////////////////////////////////////////////////////////////////////////////////////
36 // A combination of a barrier and a semaphore
37 class BarrierSemaphore {
38 public:
BarrierSemaphore(int counter=0,int maxParties=1)39 	explicit BarrierSemaphore(int counter = 0, int maxParties = 1) : counter_(counter), active_(maxParties) {}
40 	// Initializes this object
41 	// PRE: no thread is blocked on the semaphore
42 	//      (i.e. internal counter is >= 0)
43 	// NOTE: not thread-safe
unsafe_init(int counter=0,int maxParties=1)44 	void unsafe_init(int counter = 0, int maxParties = 1) {
45 		counter_ = counter;
46 		active_  = maxParties;
47 	}
48 	// Returns the current semaphore counter.
counter()49 	int  counter()   { lock_guard<mutex> lock(semMutex_); return counter_; }
50 	// Returns the number of parties required to trip this barrier.
parties()51 	int  parties()   { lock_guard<mutex> lock(semMutex_); return active_;  }
52 	// Returns true if all parties are waiting at the barrier
active()53 	bool active()    { lock_guard<mutex> lock(semMutex_); return unsafe_active(); }
54 
55 	// barrier interface
56 
57 	// Increases the barrier count, i.e. the number of
58 	// parties required to trip this barrier.
addParty()59 	void addParty() {
60 		lock_guard<mutex> lock(semMutex_);
61 		++active_;
62 	}
63 	// Decreases the barrier count and resets the barrier
64 	// if reset is true.
65 	// PRE: the thread does not itself wait on the barrier
removeParty(bool reset)66 	int removeParty(bool reset) {
67 		unique_lock<mutex> lock(semMutex_);
68 		assert(active_ > 0);
69 		int res = active_--;
70 		if      (reset)           { unsafe_reset(0); }
71 		else if (unsafe_active()) { counter_ = -active_; lock.unlock(); semCond_.notify_one(); }
72 		return res;
73 	}
74 	// Waits until all parties have arrived, i.e. called wait.
75 	// Exactly one of the parties will receive a return value of true,
76 	// the others will receive a value of false.
77 	// Applications shall use this value to designate one thread as the
78 	// leader that will eventually reset the barrier thereby unblocking the other threads.
wait()79 	bool wait() {
80 		unique_lock<mutex> lock(semMutex_);
81 		if (--counter_ >= 0) { counter_ = -1; }
82 		return unsafe_wait(lock);
83 	}
84 	// Resets the barrier and unblocks any waiting threads.
reset(uint32 semCount=0)85 	void reset(uint32 semCount = 0) {
86 		lock_guard<mutex> lock(semMutex_);
87 		unsafe_reset(semCount);
88 	}
89 	// semaphore interface
90 
91 	// Decrement the semaphore's counter.
92 	// If the counter is zero or less prior to the call
93 	// the calling thread is suspended.
94 	// Returns false to signal that all but the calling thread
95 	// are currently blocked.
down()96 	bool down() {
97 		unique_lock<mutex> lock(semMutex_);
98 		return down(lock);
99 	}
100 	// LOW-LEVEL version of down
down(unique_lock<mutex> & m)101 	bool down(unique_lock<mutex>& m) {
102 		assert(m.owns_lock());
103 		if (--counter_ >= 0) { return true; }
104 		return !unsafe_wait(m);
105 	}
106 	// Increments the semaphore's counter and resumes
107 	// one thread which has called down() if the counter
108 	// was less than zero prior to the call.
up()109 	void up() {
110 		unique_lock<mutex> lock(semMutex_);
111 		up(lock);
112 	}
113 	// LOW-LEVEL version of up
up(unique_lock<mutex> & m,bool transferLock=true)114 	void up(unique_lock<mutex>& m, bool transferLock = true) {
115 		assert(m.owns_lock());
116 		if (++counter_ < 1) {
117 			if (transferLock) { m.unlock(); }
118 			semCond_.notify_one();
119 		}
120 	}
toMutex()121 	mutex& toMutex() { return semMutex_; }
122 private:
123 	BarrierSemaphore(const BarrierSemaphore&);
124 	BarrierSemaphore& operator=(const BarrierSemaphore&);
125 	typedef condition_variable cv;
unsafe_active() const126 	bool    unsafe_active() const { return -counter_ >= active_; }
unsafe_reset(uint32 semCount)127 	void    unsafe_reset(uint32 semCount) {
128 		int prev = counter_;
129 		counter_ = semCount;
130 		if (prev < 0) { semCond_.notify_all(); }
131 	}
132 	// Returns true for the leader, else false
unsafe_wait(unique_lock<mutex> & lock)133 	bool    unsafe_wait(unique_lock<mutex>& lock) {
134 		assert(counter_ < 0);
135 		// don't put the last thread to sleep!
136 		if (!unsafe_active()) {
137 			semCond_.wait(lock);
138 		}
139 		return unsafe_active();
140 	}
141 	cv    semCond_;  // waiting threads
142 	mutex semMutex_; // mutex for updating counter
143 	int   counter_;  // semaphore's counter
144 	int   active_;   // number of active threads
145 };
146 /////////////////////////////////////////////////////////////////////////////////////////
147 // ParallelSolve::Impl
148 /////////////////////////////////////////////////////////////////////////////////////////
149 struct ParallelSolve::SharedData {
150 	typedef PodQueue<const LitVec*> Queue;
151 	enum MsgFlag {
152 		terminate_flag        = 1u, sync_flag  = 2u,  split_flag    = 4u,
153 		restart_flag          = 8u, complete_flag = 16u,
154 		interrupt_flag        = 32u, // set on terminate from outside
155 		allow_split_flag      = 64u, // set if splitting mode is active
156 		forbid_restart_flag   = 128u,// set if restarts are no longer allowed
157 		cancel_restart_flag   = 256u,// set if current restart request was cancelled by some thread
158 		restart_abandoned_flag= 512u,// set to signal that threads must not give up their gp
159 	};
160 	enum Message {
161 		msg_terminate      = (terminate_flag),
162 		msg_interrupt      = (terminate_flag | interrupt_flag),
163 		msg_sync_restart   = (sync_flag | restart_flag),
164 		msg_split          = split_flag
165 	};
166 	struct Generator {
167 		enum State { start = 0, search = 1, model = 2, done = 3 };
GeneratorClasp::mt::ParallelSolve::SharedData::Generator168 		Generator() : state(start) {}
169 		mutex genM;
170 		condition_variable cond;
waitWhileClasp::mt::ParallelSolve::SharedData::Generator171 		State waitWhile(State st) {
172 			State r;
173 			for (unique_lock<mutex> lock(genM); (r = state) == st;) { cond.wait(lock); }
174 			return r;
175 		}
pushModelClasp::mt::ParallelSolve::SharedData::Generator176 		void pushModel() {
177 			notify(model);
178 			waitWhile(model);
179 		}
notifyClasp::mt::ParallelSolve::SharedData::Generator180 		void notify(State st) {
181 			unique_lock<mutex> lock(genM);
182 			state = st;
183 			cond.notify_one();
184 		}
185 		State state;
186 	};
SharedDataClasp::mt::ParallelSolve::SharedData187 	SharedData() : path(0) { reset(0); control = 0; }
resetClasp::mt::ParallelSolve::SharedData188 	void reset(SharedContext* a_ctx) {
189 		clearQueue();
190 		syncT.reset();
191 		workSem.unsafe_init(0, a_ctx ? a_ctx->concurrency() : 0);
192 		msg.clear();
193 		globalR.reset();
194 		maxConflict = globalR.current();
195 		errorSet    = 0;
196 		initVec     = 0;
197 		ctx         = a_ctx;
198 		path        = 0;
199 		nextId      = 1;
200 		workReq     = 0;
201 		restartReq  = 0;
202 		generator   = 0;
203 		errorCode   = 0;
204 	}
clearQueueClasp::mt::ParallelSolve::SharedData205 	void clearQueue() {
206 		while (!workQ.empty()) { delete workQ.pop_ret(); }
207 		workQ.clear();
208 	}
requestWorkClasp::mt::ParallelSolve::SharedData209 	const LitVec* requestWork(const Solver& s) {
210 		const uint64 m(uint64(1) << s.id());
211 		if ((initVec & m) == m) {
212 			if (!allowSplit()) {
213 				// portfolio mode - all solvers can start with initial path
214 				initVec -= m;
215 				return path;
216 			}
217 			else if (initVec.exchange(0) != 0) {
218 				// splitting mode - only one solver must start with initial path
219 				return path;
220 			}
221 		}
222 		if (!allowSplit()) { return 0; }
223 		// try to get work from split
224 		ctx->report(MessageEvent(s, "SPLIT", MessageEvent::sent));
225 		const uint32 flags = uint32(terminate_flag) | uint32(sync_flag);
226 		for (unique_lock<mutex> lock(workSem.toMutex());;) {
227 			if (!workQ.empty()) {
228 				const LitVec* res = workQ.pop_ret();
229 				if (workQ.empty()) { workQ.clear(); }
230 				return res;
231 			}
232 			postMessage(SharedData::msg_split, false);
233 			if (!workSem.down(lock) || hasControl(flags)) {
234 				return 0;
235 			}
236 		}
237 	}
pushWorkClasp::mt::ParallelSolve::SharedData238 	void pushWork(const LitVec* v) {
239 		unique_lock<mutex> lock(workSem.toMutex());
240 		workQ.push(v);
241 		workSem.up(lock);
242 	}
243 	// MESSAGES
244 	bool postMessage(Message m, bool notify);
hasMessageClasp::mt::ParallelSolve::SharedData245 	bool hasMessage()  const { return (control & uint32(7)) != 0; }
synchronizeClasp::mt::ParallelSolve::SharedData246 	bool synchronize() const { return (control & uint32(sync_flag))      != 0; }
terminateClasp::mt::ParallelSolve::SharedData247 	bool terminate()   const { return (control & uint32(terminate_flag)) != 0; }
splitClasp::mt::ParallelSolve::SharedData248 	bool split()       const { return (control & uint32(split_flag))     != 0; }
aboutToSplitClasp::mt::ParallelSolve::SharedData249 	void aboutToSplit()      { if (--workReq == 0) updateSplitFlag();  }
250 	void updateSplitFlag();
251 	// CONTROL FLAGS
hasControlClasp::mt::ParallelSolve::SharedData252 	bool hasControl(uint32 f) const { return (control & f) != 0;        }
interruptClasp::mt::ParallelSolve::SharedData253 	bool interrupt()          const { return hasControl(interrupt_flag);}
completeClasp::mt::ParallelSolve::SharedData254 	bool complete()           const { return hasControl(complete_flag); }
restartClasp::mt::ParallelSolve::SharedData255 	bool restart()            const { return hasControl(restart_flag);  }
allowSplitClasp::mt::ParallelSolve::SharedData256 	bool allowSplit()         const { return hasControl(allow_split_flag); }
allowRestartClasp::mt::ParallelSolve::SharedData257 	bool allowRestart()       const { return !hasControl(forbid_restart_flag); }
setControlClasp::mt::ParallelSolve::SharedData258 	bool setControl(uint32 flags)   { return (control.fetch_or(flags) & flags) != flags; }
clearControlClasp::mt::ParallelSolve::SharedData259 	bool clearControl(uint32 flags) { return (control.fetch_and(~flags) & flags) == flags; }
260 	typedef SingleOwnerPtr<Generator> GeneratorPtr;
261 	Potassco::StringBuilder msg;  // global error message
262 	ScheduleStrategy globalR;     // global restart strategy
263 	uint64           maxConflict; // current restart limit
264 	atomic<uint64>   errorSet;    // bitmask of erroneous solvers
265 	SharedContext*   ctx;         // shared context object
266 	const LitVec*    path;        // initial guiding path - typically empty
267 	atomic<uint64>   initVec;     // vector of initial gp - represented as bitset
268 	GeneratorPtr     generator;   // optional data for model generation
269 	Timer<RealTime>  syncT;       // thread sync time
270 	mutex            modelM;      // model-mutex
271 	BarrierSemaphore workSem;     // work-semaphore
272 	Queue            workQ;       // work-queue (must be protected by workSem)
273 	uint32           nextId;      // next solver id to use
274 	LowerBound       lower_;      // last reported lower bound (if any)
275 	atomic<int>      workReq;     // > 0: someone needs work
276 	atomic<uint32>   restartReq;  // == numThreads(): restart
277 	atomic<uint32>   control;     // set of active message flags
278 	atomic<uint32>   modCount;    // coounter for synchronizing models
279 	uint32           errorCode;   // global error code
280 };
281 
282 // post message to all threads
postMessage(Message m,bool notifyWaiting)283 bool ParallelSolve::SharedData::postMessage(Message m, bool notifyWaiting) {
284 	if (m == msg_split) {
285 		if (++workReq == 1) { updateSplitFlag(); }
286 		return true;
287 	}
288 	else if (setControl(m)) {
289 		// control message - notify all if requested
290 		if (notifyWaiting) workSem.reset();
291 		if ((uint32(m) & uint32(sync_flag|terminate_flag)) != 0) {
292 			syncT.reset();
293 			syncT.start();
294 		}
295 		return true;
296 	}
297 	return false;
298 }
299 
updateSplitFlag()300 void ParallelSolve::SharedData::updateSplitFlag() {
301 	for (bool splitF;;) {
302 		splitF = (workReq > 0);
303 		if (split() == splitF) return;
304 		if (splitF) control.fetch_or(uint32(split_flag));
305 		else        control.fetch_and(~uint32(split_flag));
306 	}
307 }
308 /////////////////////////////////////////////////////////////////////////////////////////
309 // ParallelSolve
310 /////////////////////////////////////////////////////////////////////////////////////////
ParallelSolve(const ParallelSolveOptions & opts)311 ParallelSolve::ParallelSolve(const ParallelSolveOptions& opts)
312 	: SolveAlgorithm(opts.limit)
313 	, shared_(new SharedData)
314 	, thread_(0)
315 	, distribution_(opts.distribute)
316 	, maxRestarts_(0)
317 	, intGrace_(1024)
318 	, intTopo_(opts.integrate.topo)
319 	, intFlags_(ClauseCreator::clause_not_root_sat | ClauseCreator::clause_no_add)
320 	, modeSplit_(opts.algorithm.mode == ParallelSolveOptions::Algorithm::mode_split) {
321 	setRestarts(opts.restarts.maxR, opts.restarts.sched);
322 	setIntegrate(opts.integrate.grace, opts.integrate.filter);
323 }
324 
~ParallelSolve()325 ParallelSolve::~ParallelSolve() {
326 	if (shared_->nextId > 1) {
327 		// algorithm was not started but there may be active threads -
328 		// force orderly shutdown
329 		ParallelSolve::doInterrupt();
330 		shared_->workSem.removeParty(true);
331 		joinThreads();
332 	}
333 	destroyThread(masterId);
334 	delete shared_;
335 }
336 
beginSolve(SharedContext & ctx,const LitVec & path)337 bool ParallelSolve::beginSolve(SharedContext& ctx, const LitVec& path) {
338 	assert(ctx.concurrency() && "Illegal number of threads");
339 	if (shared_->terminate()) { return false; }
340 	shared_->reset(&ctx);
341 	if (!enumerator().supportsParallel() && numThreads() > 1) {
342 		ctx.warn("Selected reasoning mode implies #Threads=1.");
343 		shared_->workSem.unsafe_init(1);
344 		modeSplit_ = false;
345 		ctx.setConcurrency(1, SharedContext::resize_reserve);
346 	}
347 	shared_->setControl(modeSplit_ ? SharedData::allow_split_flag : SharedData::forbid_restart_flag);
348 	shared_->modCount = uint32(enumerator().optimize());
349 	shared_->path = &path;
350 	if (distribution_.types != 0 && ctx.distributor.get() == 0 && numThreads() > 1) {
351 		if (distribution_.mode == ParallelSolveOptions::Distribution::mode_local) {
352 			ctx.distributor.reset(new mt::LocalDistribution(distribution_, ctx.concurrency(), intTopo_));
353 		}
354 		else {
355 			ctx.distributor.reset(new mt::GlobalDistribution(distribution_, ctx.concurrency(), intTopo_));
356 		}
357 	}
358 	shared_->setControl(SharedData::sync_flag); // force initial sync with all threads
359 	shared_->syncT.start();
360 	reportProgress(MessageEvent(*ctx.master(), "SYNC", MessageEvent::sent));
361 	assert(ctx.master()->id() == masterId);
362 	allocThread(masterId, *ctx.master());
363 	for (uint32 i = 1; i != ctx.concurrency(); ++i) {
364 		uint32 id = shared_->nextId++;
365 		allocThread(id, *ctx.solver(id));
366 		// start in new thread
367 		Clasp::mt::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, id);
368 		thread_[id]->setThread(x);
369 	}
370 	return true;
371 }
372 
setIntegrate(uint32 grace,uint8 filter)373 void ParallelSolve::setIntegrate(uint32 grace, uint8 filter) {
374 	typedef ParallelSolveOptions::Integration Dist;
375 	intGrace_ = grace;
376 	intFlags_ = ClauseCreator::clause_no_add;
377 	if (filter == Dist::filter_heuristic) { store_set_bit(intFlags_, 31); }
378 	if (filter != Dist::filter_no)        { intFlags_ |= ClauseCreator::clause_not_root_sat; }
379 	if (filter == Dist::filter_sat)       { intFlags_ |= ClauseCreator::clause_not_sat; }
380 }
381 
setRestarts(uint32 maxR,const ScheduleStrategy & rs)382 void ParallelSolve::setRestarts(uint32 maxR, const ScheduleStrategy& rs) {
383 	maxRestarts_         = maxR;
384 	shared_->globalR     = maxR ? rs : ScheduleStrategy::none();
385 	shared_->maxConflict = shared_->globalR.current();
386 }
387 
numThreads() const388 uint32 ParallelSolve::numThreads() const { return shared_->workSem.parties(); }
389 
allocThread(uint32 id,Solver & s)390 void ParallelSolve::allocThread(uint32 id, Solver& s) {
391 	if (!thread_) {
392 		uint32 n = numThreads();
393 		thread_  = new ParallelHandler*[n];
394 		std::fill(thread_, thread_+n, static_cast<ParallelHandler*>(0));
395 	}
396 	size_t sz   = ((sizeof(ParallelHandler)+63) / 64) * 64;
397 	thread_[id] = new (alignedAlloc(sz, 64)) ParallelHandler(*this, s);
398 }
399 
destroyThread(uint32 id)400 void ParallelSolve::destroyThread(uint32 id) {
401 	if (thread_ && thread_[id]) {
402 		assert(!thread_[id]->joinable() && "Shutdown not completed!");
403 		thread_[id]->~ParallelHandler();
404 		alignedFree(thread_[id]);
405 		thread_[id] = 0;
406 		if (id == masterId) {
407 			delete [] thread_;
408 			thread_ = 0;
409 		}
410 	}
411 }
412 
reportProgress(const Event & ev) const413 inline void ParallelSolve::reportProgress(const Event& ev) const {
414 	return shared_->ctx->report(ev);
415 }
reportProgress(const Solver & s,const char * msg) const416 inline void ParallelSolve::reportProgress(const Solver& s, const char* msg) const {
417 	return shared_->ctx->report(msg, &s);
418 }
419 
420 // joins with and destroys all active threads
joinThreads()421 int ParallelSolve::joinThreads() {
422 	uint32 winner = thread_[masterId]->winner() ? uint32(masterId) : UINT32_MAX;
423 	// detach master only after all client threads are done
424 	for (uint32 i = 1, end = shared_->nextId; i != end; ++i) {
425 		thread_[i]->join();
426 		if (thread_[i]->winner() && i < winner) {
427 			winner = i;
428 		}
429 		Solver* s = &thread_[i]->solver();
430 		reportProgress(*s, "joined");
431 		destroyThread(i);
432 		reportProgress(*s, "destroyed");
433 	}
434 	if (shared_->complete()) {
435 		enumerator().commitComplete();
436 	}
437 	thread_[masterId]->handleTerminateMessage();
438 	shared_->ctx->setWinner(winner);
439 	shared_->nextId = 1;
440 	shared_->syncT.stop();
441 	reportProgress(MessageEvent(*shared_->ctx->master(), "TERMINATE", MessageEvent::completed, shared_->syncT.total()));
442 	return !shared_->interrupt() ? thread_[masterId]->error() : shared_->errorCode;
443 }
444 
doStart(SharedContext & ctx,const LitVec & assume)445 void ParallelSolve::doStart(SharedContext& ctx, const LitVec& assume) {
446 	if (beginSolve(ctx, assume)) {
447 		// start computation in new thread
448 		shared_->generator = new SharedData::Generator();
449 		Clasp::mt::thread x(std::mem_fun(&ParallelSolve::solveParallel), this, static_cast<uint32>(masterId));
450 		thread_[masterId]->setThread(x);
451 	}
452 }
doNext(int)453 int ParallelSolve::doNext(int) {
454 	POTASSCO_REQUIRE(shared_->generator.get(), "Invalid operation");
455 	int s = shared_->generator->state;
456 	if (s != SharedData::Generator::done) {
457 		shared_->generator->notify(SharedData::Generator::search);
458 		if ((s = shared_->generator->waitWhile(SharedData::Generator::search)) == SharedData::Generator::model) {
459 			return value_true;
460 		}
461 	}
462 	return shared_->complete() ? value_false : value_free;
463 }
doStop()464 void ParallelSolve::doStop() {
465 	if (shared_->nextId <= 1) { return; }
466 	reportProgress(*shared_->ctx->master(), "joining with other threads");
467 	if (shared_->generator.get()) {
468 		shared_->setControl(SharedData::terminate_flag);
469 		shared_->generator->notify(SharedData::Generator::done);
470 		thread_[masterId]->join();
471 	}
472 	int err = joinThreads();
473 	shared_->generator = 0;
474 	shared_->ctx->distributor.reset(0);
475 	switch(err) {
476 		case 0: break;
477 		case LogicError:   throw std::logic_error(shared_->msg.c_str());
478 		case RuntimeError: throw std::runtime_error(shared_->msg.c_str());
479 		case OutOfMemory:  throw std::bad_alloc();
480 		default:           throw std::runtime_error(shared_->msg.c_str());
481 	}
482 }
483 
doDetach()484 void ParallelSolve::doDetach() {
485 	// detach master only after all client threads are done
486 	thread_[masterId]->detach(*shared_->ctx, shared_->interrupt());
487 	destroyThread(masterId);
488 }
489 
490 // Entry point for master solver
doSolve(SharedContext & ctx,const LitVec & path)491 bool ParallelSolve::doSolve(SharedContext& ctx, const LitVec& path) {
492 	if (beginSolve(ctx, path)) {
493 		solveParallel(masterId);
494 		ParallelSolve::doStop();
495 	}
496 	return !shared_->complete();
497 }
498 
499 // main solve loop executed by all threads
solveParallel(uint32 id)500 void ParallelSolve::solveParallel(uint32 id) {
501 	Solver& s = thread_[id]->solver();
502 	SolverStats agg;
503 	PathPtr a(0);
504 	if (id == masterId && shared_->generator.get()) {
505 		shared_->generator->waitWhile(SharedData::Generator::start);
506 	}
507 	try {
508 		// establish solver<->handler connection and attach to shared context
509 		// should this fail because of an initial conflict, we'll terminate
510 		// in requestWork.
511 		thread_[id]->attach(*shared_->ctx);
512 		BasicSolve solve(s, limits());
513 		agg.enable(s.stats);
514 		for (GpType t; requestWork(s, a);) {
515 			agg.accu(s.stats);
516 			s.stats.reset();
517 			thread_[id]->setGpType(t = ((a.is_owner() || modeSplit_) ? gp_split : gp_fixed));
518 			if (enumerator().start(s, *a, a.is_owner()) && thread_[id]->solveGP(solve, t, shared_->maxConflict) == value_free) {
519 				terminate(s, false);
520 			}
521 			s.clearStopConflict();
522 			s.undoUntil(0);
523 			enumerator().end(s);
524 		}
525 	}
526 	catch (const std::bad_alloc&)       { exception(id, a, OutOfMemory,  "bad alloc"); }
527 	catch (const std::logic_error& e)   { exception(id, a, LogicError,   e.what()); }
528 	catch (const std::runtime_error& e) { exception(id, a, RuntimeError, e.what()); }
529 	catch (const std::exception& e)     { exception(id, a, RuntimeError, e.what()); }
530 	catch (...)                         { exception(id, a, UnknownError, "unknown");  }
531 	assert(shared_->terminate() || thread_[id]->error());
532 	// this thread is leaving
533 	int active = shared_->workSem.removeParty(shared_->terminate());
534 	// update stats
535 	s.stats.accu(agg);
536 	if (id != masterId) {
537 		// remove solver<->handler connection and detach from shared context
538 		// note: since detach can change the problem, we must not yet detach the master
539 		// because some client might still be in the middle of an attach operation
540 		thread_[id]->detach(*shared_->ctx, shared_->interrupt());
541 		s.stats.addCpuTime(ThreadTime::getTime());
542 	}
543 	if (active == 1 && shared_->generator.get()) {
544 		shared_->generator->notify(SharedData::Generator::done);
545 	}
546 }
547 
exception(uint32 id,PathPtr & path,ErrorCode e,const char * what)548 void ParallelSolve::exception(uint32 id, PathPtr& path, ErrorCode e, const char* what) {
549 	try {
550 		if (!thread_[id]->setError(e) || e != OutOfMemory || shared_->workSem.active()) {
551 			ParallelSolve::doInterrupt();
552 			if (shared_->errorSet.fetch_or(bit_mask<uint64>(id)) == 0) {
553 				shared_->errorCode = e;
554 				shared_->msg.appendFormat("[%u]: %s", id, what);
555 			}
556 		}
557 		else if (path.get() && shared_->allowSplit()) {
558 			shared_->workQ.push(path.release());
559 			shared_->workSem.up();
560 		}
561 		reportProgress(thread_[id]->solver(), e == OutOfMemory ? "Thread failed with out of memory" : "Thread failed with error");
562 	}
563 	catch(...) { ParallelSolve::doInterrupt(); }
564 }
565 
566 // forced termination from outside
doInterrupt()567 bool ParallelSolve::doInterrupt() {
568 	// do not notify blocked threads to avoid possible
569 	// deadlock in semaphore!
570 	shared_->postMessage(SharedData::msg_interrupt, false);
571 	return true;
572 }
573 
574 // tries to get new work for the given solver
requestWork(Solver & s,PathPtr & out)575 bool ParallelSolve::requestWork(Solver& s, PathPtr& out) {
576 	const LitVec* a = 0;
577 	for (int popped = 0; !shared_->terminate();) {
578 		// only clear path and stop conflict - we don't propagate() here
579 		// because we would then have to handle any eventual conflicts
580 		if (++popped == 1 && !s.popRootLevel(s.rootLevel())) {
581 			// s has a real top-level conflict - problem is unsat
582 			terminate(s, true);
583 		}
584 		else if (shared_->synchronize()) {
585 			// a synchronize request is active - we are fine with
586 			// this but did not yet had a chance to react on it
587 			waitOnSync(s);
588 		}
589 		else if (a || (a = shared_->requestWork(s)) != 0) {
590 			assert(s.decisionLevel() == 0);
591 			// got new work from work-queue
592 			out = a;
593 			// do not take over ownership of initial gp!
594 			if (a == shared_->path) { out.release(); }
595 			// propagate any new facts before starting new work
596 			if (s.simplify())       { return true; }
597 			// s now has a conflict - either an artifical stop conflict
598 			// or a real conflict - we'll handle it in the next iteration
599 			// via the call to popRootLevel()
600 			popped = 0;
601 		}
602 		else if (!shared_->allowSplit() || !shared_->synchronize()) {
603 			// no work left - quitting time?
604 			terminate(s, true);
605 		}
606 	}
607 	return false;
608 }
609 
610 // terminated from inside of algorithm
611 // check if there is more to do
terminate(Solver & s,bool complete)612 void ParallelSolve::terminate(Solver& s, bool complete) {
613 	if (!shared_->terminate()) {
614 		if (enumerator().tentative() && complete) {
615 			if (shared_->setControl(SharedData::sync_flag|SharedData::complete_flag)) {
616 				thread_[s.id()]->setWinner();
617 				reportProgress(MessageEvent(s, "SYNC", MessageEvent::sent));
618 			}
619 		}
620 		else {
621 			reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::sent));
622 			shared_->postMessage(SharedData::msg_terminate, true);
623 			thread_[s.id()]->setWinner();
624 			if (complete) { shared_->setControl(SharedData::complete_flag); }
625 		}
626 	}
627 }
628 
629 // handles an active synchronization request
630 // returns true to signal that s should restart otherwise false
waitOnSync(Solver & s)631 bool ParallelSolve::waitOnSync(Solver& s) {
632 	if (!thread_[s.id()]->handleRestartMessage()) {
633 		shared_->setControl(SharedData::cancel_restart_flag);
634 	}
635 	bool hasPath  = thread_[s.id()]->hasPath();
636 	bool tentative= enumerator().tentative();
637 	if (shared_->workSem.wait()) {
638 		// last man standing - complete synchronization request
639 		shared_->workReq     = 0;
640 		shared_->restartReq  = 0;
641 		bool restart         = shared_->hasControl(SharedData::restart_flag);
642 		bool init            = true;
643 		if (restart) {
644 			init = shared_->allowRestart() && !shared_->hasControl(SharedData::cancel_restart_flag);
645 			if (init) { shared_->globalR.next(); }
646 			shared_->maxConflict = shared_->allowRestart() && shared_->globalR.idx < maxRestarts_
647 				? shared_->globalR.current()
648 				: UINT64_MAX;
649 		}
650 		else if (shared_->maxConflict != UINT64_MAX && !shared_->allowRestart()) {
651 			shared_->maxConflict = UINT64_MAX;
652 		}
653 		if (init) { initQueue();  }
654 		else      { shared_->setControl(SharedData::restart_abandoned_flag); }
655 		if (tentative && shared_->complete()) {
656 			if (enumerator().commitComplete()) { shared_->setControl(SharedData::terminate_flag); }
657 			else                               { shared_->modCount = uint32(0); shared_->clearControl(SharedData::complete_flag); }
658 		}
659 		shared_->clearControl(SharedData::msg_split | SharedData::msg_sync_restart | SharedData::restart_abandoned_flag | SharedData::cancel_restart_flag);
660 		shared_->syncT.lap();
661 		reportProgress(MessageEvent(s, "SYNC", MessageEvent::completed, shared_->syncT.elapsed()));
662 		// wake up all blocked threads
663 		shared_->workSem.reset();
664 	}
665 	return shared_->terminate() || (hasPath && !shared_->hasControl(SharedData::restart_abandoned_flag));
666 }
667 
668 // If guiding path scheme is active only one
669 // thread can start with gp (typically empty) and this
670 // thread must then split up search-space dynamically.
671 // Otherwise, all threads can start with initial gp
672 // TODO:
673 //  heuristic for initial splits?
initQueue()674 void ParallelSolve::initQueue() {
675 	shared_->clearQueue();
676 	if (shared_->allowSplit() && modeSplit_ && !enumerator().supportsSplitting(*shared_->ctx)) {
677 		shared_->ctx->warn("Selected strategies imply Mode=compete.");
678 		shared_->clearControl(SharedData::allow_split_flag);
679 		shared_->setControl(SharedData::forbid_restart_flag);
680 		modeSplit_ = false;
681 	}
682 	shared_->initVec = UINT64_MAX;
683 	assert(shared_->allowSplit() || shared_->hasControl(SharedData::forbid_restart_flag));
684 }
685 
686 // adds work to the work-queue
pushWork(LitVec * v)687 void ParallelSolve::pushWork(LitVec* v) {
688 	assert(v);
689 	shared_->pushWork(v);
690 }
691 
692 // called whenever some solver proved unsat
commitUnsat(Solver & s)693 bool ParallelSolve::commitUnsat(Solver& s) {
694 	const int supUnsat = enumerator().unsatType();
695 	if (supUnsat == Enumerator::unsat_stop || shared_->terminate() || shared_->synchronize()) {
696 		return false;
697 	}
698 	unique_lock<mutex> lock(shared_->modelM, defer_lock_t());
699 	if (supUnsat == Enumerator::unsat_sync) {
700 		lock.lock();
701 	}
702 	bool result = enumerator().commitUnsat(s);
703 	if (lock.owns_lock()) {
704 		lock.unlock();
705 	}
706 	if (!thread_[s.id()]->disjointPath()) {
707 		if (result) {
708 			++shared_->modCount;
709 			if (s.lower.bound > 0) {
710 				lock.lock();
711 				if (s.lower.bound > shared_->lower_.bound || s.lower.level > shared_->lower_.level) {
712 					shared_->lower_ = s.lower;
713 					reportUnsat(s);
714 					++shared_->modCount;
715 				}
716 				lock.unlock();
717 			}
718 		}
719 		else { terminate(s, true);  }
720 	}
721 	return result;
722 }
723 
724 // called whenever some solver has found a model
commitModel(Solver & s)725 bool ParallelSolve::commitModel(Solver& s) {
726 	// grab lock - models must be processed sequentially
727 	// in order to simplify printing and to avoid duplicates
728 	// in all non-trivial enumeration modes
729 	bool stop = false;
730 	{lock_guard<mutex> lock(shared_->modelM);
731 	// first check if the model is still valid once all
732 	// information is integrated into the solver
733 	if (thread_[s.id()]->isModelLocked(s) && (stop=shared_->terminate()) == false && enumerator().commitModel(s)) {
734 		if (enumerator().lastModel().num == 1 && !enumerator().supportsRestarts()) {
735 			// switch to backtracking based splitting algorithm
736 			// the solver's gp will act as the root for splitting and is
737 			// from now on disjoint from all other gps
738 			shared_->setControl(SharedData::forbid_restart_flag | SharedData::allow_split_flag);
739 			thread_[s.id()]->setGpType(gp_split);
740 			enumerator().setDisjoint(s, true);
741 		}
742 		if (shared_->generator.get()) {
743 			shared_->generator->pushModel();
744 		}
745 		else if ((stop = !reportModel(s)) == true) {
746 			// must be called while holding the lock - otherwise
747 			// we have a race condition with solvers that
748 			// are currently blocking on the mutex and we could enumerate
749 			// more models than requested by the user
750 			terminate(s, !moreModels(s));
751 		}
752 		++shared_->modCount;
753 	}}
754 	return !stop;
755 }
756 
hasErrors() const757 uint64 ParallelSolve::hasErrors() const {
758 	return shared_->errorSet != 0u;
759 }
interrupted() const760 bool ParallelSolve::interrupted() const {
761 	return shared_->interrupt();
762 }
resetSolve()763 void ParallelSolve::resetSolve() {
764 	shared_->control = 0;
765 }
enableInterrupts()766 void ParallelSolve::enableInterrupts() {}
767 // updates s with new messages and uses s to create a new guiding path
768 // if necessary and possible
handleMessages(Solver & s)769 bool ParallelSolve::handleMessages(Solver& s) {
770 	// check if there are new messages for s
771 	if (!shared_->hasMessage()) {
772 		// nothing to do
773 		return true;
774 	}
775 	ParallelHandler* h = thread_[s.id()];
776 	if (shared_->terminate()) {
777 		reportProgress(MessageEvent(s, "TERMINATE", MessageEvent::received));
778 		h->handleTerminateMessage();
779 		s.setStopConflict();
780 		return false;
781 	}
782 	if (shared_->synchronize()) {
783 		reportProgress(MessageEvent(s, "SYNC", MessageEvent::received));
784 		if (waitOnSync(s)) {
785 			s.setStopConflict();
786 			return false;
787 		}
788 		return true;
789 	}
790 	if (h->disjointPath() && s.splittable() && shared_->workReq > 0) {
791 		// First declare split request as handled
792 		// and only then do the actual split.
793 		// This way, we minimize the chance for
794 		// "over"-splitting, i.e. one split request handled
795 		// by more than one thread.
796 		shared_->aboutToSplit();
797 		reportProgress(MessageEvent(s, "SPLIT", MessageEvent::received));
798 		h->handleSplitMessage();
799 		enumerator().setDisjoint(s, true);
800 	}
801 	return true;
802 }
803 
integrateModels(Solver & s,uint32 & upCount)804 bool ParallelSolve::integrateModels(Solver& s, uint32& upCount) {
805 	uint32 gCount = shared_->modCount;
806 	return gCount == upCount || (enumerator().update(s) && (upCount = gCount) == gCount);
807 }
808 
requestRestart()809 void ParallelSolve::requestRestart() {
810 	if (shared_->allowRestart() && ++shared_->restartReq == numThreads()) {
811 		shared_->postMessage(SharedData::msg_sync_restart, true);
812 	}
813 }
814 
createSolveObject() const815 SolveAlgorithm* ParallelSolveOptions::createSolveObject() const {
816 	return numSolver() > 1 ? new ParallelSolve(*this) : BasicSolveOptions::createSolveObject();
817 }
818 ////////////////////////////////////////////////////////////////////////////////////
819 // ParallelHandler
820 /////////////////////////////////////////////////////////////////////////////////////////
ParallelHandler(ParallelSolve & ctrl,Solver & s)821 ParallelHandler::ParallelHandler(ParallelSolve& ctrl, Solver& s)
822 	: MessageHandler()
823 	, ctrl_(&ctrl)
824 	, solver_(&s)
825 	, received_(0)
826 	, recEnd_(0)
827 	, intEnd_(0)
828 	, error_(0)
829 	, win_(0)
830 	, up_(0) {
831 	this->next = this;
832 }
833 
~ParallelHandler()834 ParallelHandler::~ParallelHandler() { clearDB(0); delete [] received_; }
835 
836 // adds this as post propagator to its solver and attaches the solver to ctx.
attach(SharedContext & ctx)837 bool ParallelHandler::attach(SharedContext& ctx) {
838 	assert(solver_);
839 	gp_.reset();
840 	error_ = 0;
841 	win_   = 0;
842 	up_    = 0;
843 	act_   = 0;
844 	lbd_   = solver_->searchConfig().reduce.strategy.glue != 0;
845 	next   = 0;
846 	if (!received_ && ctx.distributor.get()) {
847 		received_ = new SharedLiterals*[RECEIVE_BUFFER_SIZE];
848 	}
849 	ctx.report("attach", solver_);
850 	solver_->addPost(this);
851 	return ctx.attach(solver_->id());
852 }
853 
854 // removes this from the list of post propagators of its solver and detaches the solver from ctx.
detach(SharedContext & ctx,bool)855 void ParallelHandler::detach(SharedContext& ctx, bool) {
856 	handleTerminateMessage();
857 	ctx.report("detach", solver_);
858 	if (solver_->sharedContext() == &ctx) {
859 		clearDB(!error() ? solver_ : 0);
860 		ctx.report("detached db", solver_);
861 		ctx.detach(*solver_, error() != 0);
862 		ctx.report("detached ctx", solver_);
863 	}
864 }
865 
setError(int code)866 bool ParallelHandler::setError(int code) {
867 	error_ = code;
868 	return thread_.joinable() && !winner();
869 }
870 
clearDB(Solver * s)871 void ParallelHandler::clearDB(Solver* s) {
872 	for (ClauseDB::iterator it = integrated_.begin(), end = integrated_.end(); it != end; ++it) {
873 		ClauseHead* c = static_cast<ClauseHead*>(*it);
874 		if (s) { s->addLearnt(c, c->size(), Constraint_t::Other); }
875 		else   { c->destroy(); }
876 	}
877 	integrated_.clear();
878 	intEnd_= 0;
879 	for (uint32 i = 0; i != recEnd_; ++i) { received_[i]->release(); }
880 	recEnd_= 0;
881 }
882 
solveGP(BasicSolve & solve,GpType t,uint64 restart)883 ValueRep ParallelHandler::solveGP(BasicSolve& solve, GpType t, uint64 restart) {
884 	ValueRep res = value_free;
885 	Solver&  s   = solve.solver();
886 	bool     fin = false;
887 	gp_.reset(restart, t);
888 	assert(act_ == 0);
889 	do {
890 		win_ = 0;
891 		ctrl_->integrateModels(s, gp_.modCount);
892 		up_ = act_ = 1; // activate enumerator and bounds
893 		res = solve.solve();
894 		up_ = act_ = 0; // de-activate enumerator and bounds
895 		fin = true;
896 		if      (res == value_true)  { if (ctrl_->commitModel(s)) { fin = false; } }
897 		else if (res == value_false) { if (ctrl_->commitUnsat(s)) { fin = false; gp_.reset(restart, gp_.type); } }
898 	} while (!fin);
899 	return res;
900 }
901 
902 // detach from solver, i.e. ignore any further messages
handleTerminateMessage()903 void ParallelHandler::handleTerminateMessage() {
904 	if (this->next != this) {
905 		// mark removed propagator by creating "self-loop"
906 		solver_->removePost(this);
907 		this->next = this;
908 	}
909 }
910 
911 // split-off new guiding path and add it to solve object
handleSplitMessage()912 void ParallelHandler::handleSplitMessage() {
913 	assert(solver_ && "ParallelHandler::handleSplitMessage(): not attached!");
914 	assert(solver_->splittable());
915 	Solver& s = *solver_;
916 	SingleOwnerPtr<LitVec> newPath(new LitVec());
917 	s.split(*newPath);
918 	ctrl_->pushWork(newPath.release());
919 }
920 
handleRestartMessage()921 bool ParallelHandler::handleRestartMessage() {
922 	// TODO
923 	// we may want to implement some heuristic, like
924 	// computing a local var order.
925 	return true;
926 }
927 
simplify(Solver & s,bool sh)928 bool ParallelHandler::simplify(Solver& s, bool sh) {
929 	ClauseDB::size_type i, j, end = integrated_.size();
930 	for (i = j = 0; i != end; ++i) {
931 		Constraint* c = integrated_[i];
932 		if (c->simplify(s, sh)) {
933 			c->destroy(&s, false);
934 			intEnd_ -= (i < intEnd_);
935 		}
936 		else                    {
937 			integrated_[j++] = c;
938 		}
939 	}
940 	shrinkVecTo(integrated_, j);
941 	if (intEnd_ > sizeVec(integrated_)) intEnd_ = sizeVec(integrated_);
942 	return false;
943 }
944 
propagateFixpoint(Solver & s,PostPropagator * ctx)945 bool ParallelHandler::propagateFixpoint(Solver& s, PostPropagator* ctx) {
946 	// Check for messages and integrate any new information from
947 	// models/lemma exchange but only if path is setup.
948 	// Skip updates if called from other post propagator so that we do not
949 	// disturb any active propagation.
950 	if (int up = (ctx == 0 && up_ != 0)) {
951 		up_ ^= (uint32)s.updateMode();
952 		up  += (act_ == 0 || (up_ && (s.stats.choices & 63) != 0));
953 		if (s.stats.conflicts >= gp_.restart)  { ctrl_->requestRestart(); gp_.restart *= 2; }
954 		for (uint32 cDL = s.decisionLevel();;) {
955 			bool ok = ctrl_->handleMessages(s) && (up > 1 ? integrate(s) : ctrl_->integrateModels(s, gp_.modCount));
956 			if (!ok)                         { return false; }
957 			if (cDL != s.decisionLevel())    { // cancel active propagation on cDL
958 				cancelPropagation();
959 				cDL = s.decisionLevel();
960 			}
961 			if      (!s.queueSize())         { if (++up == 3) return true; }
962 			else if (!s.propagateUntil(this)){ return false; }
963 		}
964 	}
965 	return ctrl_->handleMessages(s);
966 }
967 
968 // checks whether s still has a model once all
969 // information from previously found models were integrated
isModel(Solver & s)970 bool ParallelHandler::isModel(Solver& s) {
971 	assert(s.numFreeVars() == 0);
972 	// either no unprocessed updates or still a model after
973 	// updates were integrated
974 	return ctrl_->integrateModels(s, gp_.modCount)
975 		&& s.numFreeVars() == 0
976 		&& s.queueSize()   == 0;
977 }
978 
isModelLocked(Solver & s)979 bool ParallelHandler::isModelLocked(Solver& s) {
980 	const uint32 current = gp_.modCount;
981 	if (!isModel(s))
982 		return false;
983 	if (current == gp_.modCount)
984 		return true;
985 	for (PostPropagator* p = s.getPost(PostPropagator::priority_class_general); p; p = p->next) {
986 		if (!p->isModel(s))
987 			return false;
988 	}
989 	return true;
990 }
991 
integrate(Solver & s)992 bool ParallelHandler::integrate(Solver& s) {
993 	uint32 rec = recEnd_ + s.receive(received_ + recEnd_, RECEIVE_BUFFER_SIZE - recEnd_);
994 	if (!rec) { return true; }
995 	ClauseCreator::Result ret;
996 	uint32 dl       = s.decisionLevel(), added = 0, i = 0;
997 	uint32 intFlags = ctrl_->integrateFlags();
998 	recEnd_         = 0;
999 	if (lbd_) {
1000 		intFlags |= ClauseCreator::clause_int_lbd;
1001 	}
1002 	do {
1003 		ret    = ClauseCreator::integrate(s, received_[i++], intFlags, Constraint_t::Other);
1004 		added += ret.status != ClauseCreator::status_subsumed;
1005 		if (ret.local) { add(ret.local); }
1006 		if (ret.unit()){ s.stats.addIntegratedAsserting(dl, s.decisionLevel()); dl = s.decisionLevel(); }
1007 		if (!ret.ok()) { while (i != rec) { received_[recEnd_++] = received_[i++]; } }
1008 	} while (i != rec);
1009 	s.stats.addIntegrated(added);
1010 	return !s.hasConflict();
1011 }
1012 
add(ClauseHead * h)1013 void ParallelHandler::add(ClauseHead* h) {
1014 	if (intEnd_ < integrated_.size()) {
1015 		ClauseHead* o = (ClauseHead*)integrated_[intEnd_];
1016 		integrated_[intEnd_] = h;
1017 		assert(o);
1018 		if (!ctrl_->integrateUseHeuristic() || o->locked(*solver_) || o->activity().activity() > 0) {
1019 			solver_->addLearnt(o, o->size(), Constraint_t::Other);
1020 		}
1021 		else {
1022 			o->destroy(solver_, true);
1023 			solver_->stats.removeIntegrated();
1024 		}
1025 	}
1026 	else {
1027 		integrated_.push_back(h);
1028 	}
1029 	if (++intEnd_ >= ctrl_->integrateGrace()) {
1030 		intEnd_ = 0;
1031 	}
1032 }
1033 /////////////////////////////////////////////////////////////////////////////////////////
1034 // Distribution
1035 /////////////////////////////////////////////////////////////////////////////////////////
initPeerMask(uint32 id,Integration::Topology topo,uint32 maxT)1036 uint64 ParallelSolveOptions::initPeerMask(uint32 id, Integration::Topology topo, uint32 maxT)  {
1037 	if (topo == Integration::topo_all) { return Distributor::initSet(maxT) ^ Distributor::mask(id); }
1038 	if (topo == Integration::topo_ring){
1039 		uint32 prev = id > 0 ? id - 1 : maxT - 1;
1040 		uint32 next = (id + 1) % maxT;
1041 		return Distributor::mask(prev) | Distributor::mask(next);
1042 	}
1043 	bool ext = topo == Integration::topo_cubex;
1044 	uint32 n = maxT;
1045 	uint32 k = 1;
1046 	for (uint32 i = n / 2; i > 0; i /= 2, k *= 2) { }
1047 	uint64 res = 0, x = 1;
1048 	for (uint32 m = 1; m <= k; m *= 2) {
1049 		uint32 i = m ^ id;
1050 		if      (i < n)         { res |= (x << i);     }
1051 		else if (ext && k != m) { res |= (x << (i^k)); }
1052 	}
1053 	if (ext) {
1054 		uint32 s = k ^ id;
1055 		for(uint32 m = 1; m < k && s >= n; m *= 2) {
1056 			uint32 i = m ^ s;
1057 			if (i < n) { res |= (x << i); }
1058 		}
1059 	}
1060 	assert( (res & (x<<id)) == 0 );
1061 	return res;
1062 }
1063 /////////////////////////////////////////////////////////////////////////////////////////
1064 // GlobalDistribution
1065 /////////////////////////////////////////////////////////////////////////////////////////
GlobalDistribution(const Policy & p,uint32 maxT,uint32 topo)1066 GlobalDistribution::GlobalDistribution(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), queue_(0) {
1067 	typedef ParallelSolveOptions::Integration::Topology Topology;
1068 	assert(maxT <= ParallelSolveOptions::supportedSolvers());
1069 	Topology t = static_cast<Topology>(topo);
1070 	queue_     = new Queue(maxT);
1071 	threadId_  = (ThreadInfo*)alignedAlloc((maxT * sizeof(ThreadInfo)), 64);
1072 	for (uint32 i = 0; i != maxT; ++i) {
1073 		new (&threadId_[i]) ThreadInfo;
1074 		threadId_[i].id       = queue_->addThread();
1075 		threadId_[i].peerMask = ParallelSolveOptions::initPeerMask(i, t, maxT);
1076 	}
1077 }
~GlobalDistribution()1078 GlobalDistribution::~GlobalDistribution() {
1079 	static_assert(sizeof(ThreadInfo) == 64, "Invalid size");
1080 	release();
1081 }
release()1082 void GlobalDistribution::release() {
1083 	if (queue_) {
1084 		for (uint32 i = 0; i != queue_->maxThreads(); ++i) {
1085 			Queue::ThreadId& id = getThreadId(i);
1086 			for (ClausePair n; queue_->tryConsume(id, n); ) {
1087 				if (n.sender != i) { n.lits->release(); }
1088 			}
1089 			threadId_[i].~ThreadInfo();
1090 		}
1091 		delete queue_;
1092 		queue_ = 0;
1093 		alignedFree(threadId_);
1094 	}
1095 }
publish(const Solver & s,SharedLiterals * n)1096 void GlobalDistribution::publish(const Solver& s, SharedLiterals* n) {
1097 	assert(n->refCount() >= (queue_->maxThreads()-1));
1098 	queue_->publish(ClausePair(s.id(), n), getThreadId(s.id()));
1099 }
receive(const Solver & in,SharedLiterals ** out,uint32 maxn)1100 uint32 GlobalDistribution::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
1101 	uint32 r = 0;
1102 	Queue::ThreadId& id = getThreadId(in.id());
1103 	uint64 peers = getPeerMask(in.id());
1104 	for (ClausePair n; r != maxn && queue_->tryConsume(id, n); ) {
1105 		if (n.sender != in.id()) {
1106 			if (inSet(peers, n.sender))  { out[r++] = n.lits; }
1107 			else if (n.lits->size() == 1){ out[r++] = n.lits; }
1108 			else                         { n.lits->release(); }
1109 		}
1110 	}
1111 	return r;
1112 }
1113 
1114 /////////////////////////////////////////////////////////////////////////////////////////
1115 // LocalDistribution
1116 /////////////////////////////////////////////////////////////////////////////////////////
LocalDistribution(const Policy & p,uint32 maxT,uint32 topo)1117 LocalDistribution::LocalDistribution(const Policy& p, uint32 maxT, uint32 topo) : Distributor(p), thread_(0), numThread_(0) {
1118 	typedef ParallelSolveOptions::Integration::Topology Topology;
1119 	assert(maxT <= ParallelSolveOptions::supportedSolvers());
1120 	Topology t = static_cast<Topology>(topo);
1121 	thread_    = new ThreadData*[numThread_ = maxT];
1122 	size_t sz  = ((sizeof(ThreadData) + 63) / 64) * 64;
1123 	for (uint32 i = 0; i != maxT; ++i) {
1124 		ThreadData* ti = new (alignedAlloc(sz, 64)) ThreadData;
1125 		ti->received.init(&ti->sentinal);
1126 		ti->peers = ParallelSolveOptions::initPeerMask(i, t, maxT);
1127 		ti->free  = 0;
1128 		thread_[i]= ti;
1129 	}
1130 }
~LocalDistribution()1131 LocalDistribution::~LocalDistribution() {
1132 	while (numThread_) {
1133 		ThreadData* ti      = thread_[--numThread_];
1134 		thread_[numThread_] = 0;
1135 		for (QNode* n; (n = ti->received.pop()) != 0; ) {
1136 			static_cast<SharedLiterals*>(n->data)->release();
1137 		}
1138 		ti->~ThreadData();
1139 		alignedFree(ti);
1140 	}
1141 	for (MPSCPtrQueue::RawNode* n; (n = blocks_.tryPop()) != 0; ) {
1142 		alignedFree(n);
1143 	}
1144 	delete [] thread_;
1145 }
1146 
freeNode(uint32 tId,QNode * n) const1147 void LocalDistribution::freeNode(uint32 tId, QNode* n) const {
1148 	if (n != &thread_[tId]->sentinal) {
1149 		n->next = thread_[tId]->free;
1150 		thread_[tId]->free = n;
1151 	}
1152 }
1153 
allocNode(uint32 tId,SharedLiterals * clause)1154 LocalDistribution::QNode* LocalDistribution::allocNode(uint32 tId, SharedLiterals* clause) {
1155 	for (ThreadData* td = thread_[tId];;) {
1156 		if (QNode* n = td->free) {
1157 			td->free = static_cast<QNode*>(static_cast<MPSCPtrQueue::RawNode*>(n->next));
1158 			n->data  = clause;
1159 			return n;
1160 		}
1161 		// alloc a new block of node;
1162 		const uint32 nNodes = 128;
1163 		QNode* raw = (QNode*)alignedAlloc(sizeof(QNode) * nNodes, 64);
1164 		// add nodes [1, nNodes) to free list
1165 		for (uint32 i = 1; i != nNodes-1; ++i) {
1166 			raw[i].next = &raw[i+1];
1167 		}
1168 		raw[nNodes-1].next = 0;
1169 		td->free = &raw[1];
1170 		// use first node to link to block list
1171 		blocks_.push(raw);
1172 	}
1173 }
1174 
publish(const Solver & s,SharedLiterals * n)1175 void LocalDistribution::publish(const Solver& s, SharedLiterals* n) {
1176 	assert(n->refCount() >= (numThread_-1));
1177 	uint32 sender = s.id();
1178 	uint32 size   = n->size();
1179 	uint32 decRef = 0;
1180 	for (uint32 i = 0; i != numThread_; ++i) {
1181 		if (i == sender) { continue; }
1182 		if (size > 1 && !inSet(thread_[i]->peers, sender)) { ++decRef; }
1183 		else {
1184 			QNode* node = allocNode(sender, n);
1185 			thread_[i]->received.push(node);
1186 		}
1187 	}
1188 	if (decRef) { n->release(decRef); }
1189 }
receive(const Solver & in,SharedLiterals ** out,uint32 maxn)1190 uint32 LocalDistribution::receive(const Solver& in, SharedLiterals** out, uint32 maxn) {
1191 	MPSCPtrQueue& q = thread_[in.id()]->received;
1192 	QNode*        n = 0;
1193 	uint32        r = 0;
1194 	while (r != maxn && (n = q.pop()) != 0) {
1195 		out[r++] = static_cast<SharedLiterals*>(n->data);
1196 		freeNode(in.id(), n);
1197 	}
1198 	return r;
1199 }
1200 
1201 } } // namespace Clasp::mt
1202 
1203