1''' 2Functions and classes related to solving. 3 4Examples 5-------- 6 7The following example shows how to intercept models with a callback: 8 9 >>> from clingo import Control 10 >>> 11 >>> ctl = Control(["0"]) 12 >>> ctl.add("base", [], "1 { a; b } 1.") 13 >>> ctl.ground([("base", [])]) 14 >>> print(ctl.solve(on_model=print)) 15 Answer: a 16 Answer: b 17 SAT 18 19The following example shows how to yield models: 20 21 >>> from clingo import Control 22 >>> 23 >>> ctl = Control(["0"]) 24 >>> ctl.add("base", [], "1 { a; b } 1.") 25 >>> ctl.ground([("base", [])]) 26 >>> with ctl.solve(yield_=True) as hnd: 27 ... for m in hnd: 28 ... print(m) 29 ... print(hnd.get()) 30 ... 31 Answer: a 32 Answer: b 33 SAT 34 35The following example shows how to solve asynchronously: 36 37 >>> from clingo import Control 38 >>> 39 >>> ctl = Control(["0"]) 40 >>> ctl = clingo.Control("0") 41 >>> ctl.add("base", [], "1 { a; b } 1.") 42 >>> ctl.ground([("base", [])]) 43 >>> with ctl.solve(on_model=print, async_=True) as hnd: 44 ... # some computation here 45 ... hnd.wait(): 46 ... print(hnd.get()) 47 ... 48 Answer: a 49 Answer: b 50 SAT 51 52This example shows how to solve both iteratively and asynchronously: 53 54 >>> from clingo import Control 55 >>> 56 >>> ctl = Control(["0"]) 57 >>> ctl.configuration.solve.models = 0 58 >>> ctl.add("base", [], "1 { a; b } 1.") 59 >>> ctl.ground([("base", [])]) 60 >>> with ctl.solve(yield_=True, async_=True) as hnd: 61 ... while True: 62 ... hnd.resume() 63 ... # some computation here 64 ... _ = hnd.wait() 65 ... m = hnd.model() 66 ... if m is None: 67 ... print(hnd.get()) 68 ... break 69 ... print(m) 70 b 71 a 72 a b 73 None 74''' 75 76from typing import ContextManager, Iterator, List, Optional, Sequence, Tuple, Union 77from enum import Enum 78 79from ._internal import _c_call, _c_call2, _ffi, _handle_error, _lib 80from .util import Slice, SlicedSequence 81from .symbol import Symbol 82from .symbolic_atoms import SymbolicAtoms 83 84__all__ = [ 'Model', 'ModelType', 'SolveControl', 'SolveHandle', 'SolveResult' ] 85 86class SolveResult: 87 ''' 88 Captures the result of a solve call. 89 ''' 90 def __init__(self, rep): 91 self._rep = rep 92 93 @property 94 def exhausted(self) -> bool: 95 ''' 96 Determine if the search space was exhausted. 97 ''' 98 return (_lib.clingo_solve_result_exhausted & self._rep) != 0 99 100 @property 101 def interrupted(self) -> bool: 102 ''' 103 Determine if the search was interrupted. 104 ''' 105 return (_lib.clingo_solve_result_interrupted & self._rep) != 0 106 107 @property 108 def satisfiable(self) -> Optional[bool]: 109 ''' 110 `True` if the problem is satisfiable, `False` if the problem is 111 unsatisfiable, or `None` if the satisfiablity is not known. 112 ''' 113 if (_lib.clingo_solve_result_satisfiable & self._rep) != 0: 114 return True 115 if (_lib.clingo_solve_result_unsatisfiable & self._rep) != 0: 116 return False 117 return None 118 119 @property 120 def unknown(self) -> bool: 121 ''' 122 Determine if the satisfiablity is not known. 123 124 This is equivalent to satisfiable is None. 125 ''' 126 return self.satisfiable is None 127 128 @property 129 def unsatisfiable(self) -> Optional[bool]: 130 ''' 131 `True` if the problem is unsatisfiable, `False` if the problem is 132 satisfiable, or `None` if the satisfiablity is not known. 133 ''' 134 if (_lib.clingo_solve_result_unsatisfiable & self._rep) != 0: 135 return True 136 if (_lib.clingo_solve_result_satisfiable & self._rep) != 0: 137 return False 138 return None 139 140 def __str__(self): 141 if self.satisfiable: 142 return "SAT" 143 if self.unsatisfiable: 144 return "UNSAT" 145 return "UNKNOWN" 146 147 def __repr__(self): 148 return f"SolveResult({self._rep})" 149 150class SolveControl: 151 ''' 152 Object that allows for controlling a running search. 153 ''' 154 def __init__(self, rep): 155 self._rep = rep 156 157 def add_clause(self, literals: Sequence[Union[Tuple[Symbol,bool],int]]) -> None: 158 ''' 159 Add a clause that applies to the current solving step during the search. 160 161 Parameters 162 ---------- 163 literals 164 List of literals either represented as pairs of symbolic atoms and 165 Booleans or as program literals. 166 167 Notes 168 ----- 169 This function can only be called in a model callback or while iterating 170 when using a `SolveHandle`. 171 ''' 172 atoms = self.symbolic_atoms 173 p_lits = _ffi.new('clingo_literal_t[]', len(literals)) 174 for i, lit in enumerate(literals): 175 if isinstance(lit, int): 176 p_lits[i] = lit 177 else: 178 atom = atoms[lit[0]] 179 if atom is not None: 180 slit = atom.literal 181 else: 182 slit = -1 183 p_lits[i] = slit if lit[1] else -slit 184 185 _handle_error(_lib.clingo_solve_control_add_clause(self._rep, p_lits, len(literals))) 186 187 def _invert(self, lit: Union[Tuple[Symbol,bool],int]) -> Union[Tuple[Symbol,bool],int]: 188 if isinstance(lit, int): 189 return -lit 190 return lit[0], not lit[1] 191 192 def add_nogood(self, literals: Sequence[Union[Tuple[Symbol,bool],int]]) -> None: 193 ''' 194 Equivalent to `SolveControl.add_clause` with the literals inverted. 195 ''' 196 self.add_clause([self._invert(lit) for lit in literals]) 197 198 @property 199 def symbolic_atoms(self) -> SymbolicAtoms: 200 ''' 201 `clingo.symbolic_atoms.SymbolicAtoms` object to inspect the symbolic atoms. 202 ''' 203 atoms = _c_call('clingo_symbolic_atoms_t*', _lib.clingo_solve_control_symbolic_atoms, self._rep) 204 return SymbolicAtoms(atoms) 205 206class ModelType(Enum): 207 ''' 208 Enumeration of the different types of models. 209 ''' 210 BraveConsequences = _lib.clingo_model_type_brave_consequences 211 ''' 212 The model stores the set of brave consequences. 213 ''' 214 CautiousConsequences = _lib.clingo_model_type_cautious_consequences 215 ''' 216 The model stores the set of cautious consequences. 217 ''' 218 StableModel = _lib.clingo_model_type_stable_model 219 ''' 220 The model captures a stable model. 221 ''' 222 223class _SymbolSequence(Sequence[Symbol]): 224 ''' 225 Helper class to efficiently store sequences of symbols. 226 ''' 227 def __init__(self, p_symbols): 228 self._p_symbols = p_symbols 229 230 def __len__(self): 231 return len(self._p_symbols) 232 233 def __getitem__(self, slc): 234 if isinstance(slc, slice): 235 return SlicedSequence(self, Slice(slc)) 236 if slc < 0: 237 slc += len(self) 238 if slc < 0 or slc >= len(self): 239 raise IndexError('invalid index') 240 return Symbol(self._p_symbols[slc]) 241 242 def __iter__(self): 243 for i in range(len(self)): 244 yield Symbol(self._p_symbols[i]) 245 246 def __str__(self): 247 return f'[{", ".join(str(sym) for sym in self)}]' 248 249 def __repr__(self): 250 return f'[{", ".join(repr(sym) for sym in self)}]' 251 252class Model: 253 ''' 254 Provides access to a model during a solve call and provides a 255 `SolveContext` object to influence the running search. 256 257 Notes 258 ----- 259 The string representation of a model object is similar to the output of 260 models by clingo using the default output. 261 262 `Model` objects cannot be constructed from Python. Instead they are obained 263 during solving (see `Control.solve`). Furthermore, the lifetime of a model 264 object is limited to the scope of the callback it was passed to or until 265 the search for the next model is started. They must not be stored for later 266 use. 267 ''' 268 def __init__(self, rep): 269 self._rep = rep 270 271 def contains(self, atom: Symbol) -> bool: 272 ''' 273 Efficiently check if an atom is contained in the model. 274 275 Parameters 276 ---------- 277 atom 278 The atom to lookup. 279 280 Returns 281 ------- 282 Whether the given atom is contained in the model. 283 284 Notes 285 ----- 286 The atom must be represented using a function symbol. 287 ''' 288 # pylint: disable=protected-access 289 return _c_call('bool', _lib.clingo_model_contains, self._rep, atom._rep) 290 291 def extend(self, symbols: Sequence[Symbol]) -> None: 292 ''' 293 Extend a model with the given symbols. 294 295 Parameters 296 ---------- 297 symbols 298 The symbols to add to the model. 299 300 Notes 301 ----- 302 This only has an effect if there is an underlying clingo application, 303 which will print the added symbols. 304 ''' 305 # pylint: disable=protected-access 306 c_symbols = _ffi.new('clingo_symbol_t[]', len(symbols)) 307 for i, sym in enumerate(symbols): 308 c_symbols[i] = sym._rep 309 _handle_error(_lib.clingo_model_extend(self._rep, c_symbols, len(symbols))) 310 311 def is_true(self, literal: int) -> bool: 312 ''' 313 Check if the given program literal is true. 314 315 Parameters 316 ---------- 317 literal 318 The given program literal. 319 320 Returns 321 ------- 322 Whether the given program literal is true. 323 ''' 324 return _c_call('bool', _lib.clingo_model_is_true, self._rep, literal) 325 326 def symbols(self, atoms: bool=False, terms: bool=False, shown: bool=False, csp: bool=False, 327 theory: bool=False, complement: bool=False) -> Sequence[Symbol]: 328 ''' 329 Return the list of atoms, terms, or CSP assignments in the model. 330 331 Parameters 332 ---------- 333 atoms 334 Select all atoms in the model (independent of `#show` statements). 335 terms 336 Select all terms displayed with `#show` statements in the model. 337 shown 338 Select all atoms and terms as outputted by clingo. 339 csp 340 Select all csp assignments (independent of `#show` statements). 341 theory 342 Select atoms added with `Model.extend`. 343 complement 344 Return the complement of the answer set w.r.t. to the atoms known 345 to the grounder. (Does not affect csp assignments.) 346 347 Returns 348 ------- 349 The selected symbols. 350 351 Notes 352 ----- 353 Atoms are represented using functions (`Symbol` objects), and CSP 354 assignments are represented using functions with name `"$"` where the 355 first argument is the name of the CSP variable and the second its 356 value. 357 ''' 358 show = 0 359 if atoms: 360 show |= _lib.clingo_show_type_atoms 361 if terms: 362 show |= _lib.clingo_show_type_terms 363 if shown: 364 show |= _lib.clingo_show_type_shown 365 if csp: 366 show |= _lib.clingo_show_type_csp 367 if theory: 368 show |= _lib.clingo_show_type_theory 369 if complement: 370 show |= _lib.clingo_show_type_complement 371 372 size = _c_call('size_t', _lib.clingo_model_symbols_size, self._rep, show) 373 374 p_symbols = _ffi.new('clingo_symbol_t[]', size) 375 _handle_error(_lib.clingo_model_symbols(self._rep, show, p_symbols, size)) 376 377 return _SymbolSequence(p_symbols) 378 379 def __str__(self): 380 return " ".join(map(str, self.symbols(shown=True))) 381 382 def __repr__(self): 383 return f'Model({self._rep!r})' 384 385 @property 386 def context(self) -> SolveControl: 387 ''' 388 Object that allows for controlling the running search. 389 ''' 390 ctl = _c_call('clingo_solve_control_t*', _lib.clingo_model_context, self._rep) 391 return SolveControl(ctl) 392 393 @property 394 def cost(self) -> List[int]: 395 ''' 396 Return the list of integer cost values of the model. 397 398 The return values correspond to clasp's cost output. 399 ''' 400 size = _c_call('size_t', _lib.clingo_model_cost_size, self._rep) 401 402 p_costs = _ffi.new('int64_t[]', size) 403 _handle_error(_lib.clingo_model_cost(self._rep, p_costs, size)) 404 405 return list(p_costs) 406 407 @property 408 def number(self) -> int: 409 ''' 410 The running number of the model. 411 ''' 412 return _c_call('uint64_t', _lib.clingo_model_number, self._rep) 413 414 @property 415 def optimality_proven(self) -> bool: 416 ''' 417 Whether the optimality of the model has been proven. 418 ''' 419 return _c_call('bool', _lib.clingo_model_optimality_proven, self._rep) 420 421 @property 422 def thread_id(self) -> int: 423 ''' 424 The id of the thread which found the model. 425 ''' 426 return _c_call('clingo_id_t', _lib.clingo_model_thread_id, self._rep) 427 428 @property 429 def type(self) -> ModelType: 430 ''' 431 The type of the model. 432 ''' 433 return ModelType(_c_call('clingo_model_type_t', _lib.clingo_model_type, self._rep)) 434 435class SolveHandle(ContextManager['SolveHandle']): 436 ''' 437 Handle for solve calls. 438 439 They can be used to control solving, like, retrieving models or cancelling 440 a search. 441 442 See Also 443 -------- 444 Control.solve 445 446 Notes 447 ----- 448 A `SolveHandle` is a context manager and must be used with Python's `with` 449 statement. 450 451 Blocking functions in this object release the GIL. They are not thread-safe 452 though. 453 ''' 454 def __init__(self, rep, handler): 455 self._rep = rep 456 self._handler = handler 457 458 def __iter__(self) -> Iterator[Model]: 459 while True: 460 self.resume() 461 m = self.model() 462 if m is None: 463 break 464 yield m 465 466 def __enter__(self): 467 return self 468 469 def __exit__(self, exc_type, exc_val, exc_tb): 470 _handle_error(_lib.clingo_solve_handle_close(self._rep), self._handler) 471 return False 472 473 def cancel(self) -> None: 474 ''' 475 Cancel the running search. 476 477 See Also 478 -------- 479 clingo.control.Control.interrupt 480 ''' 481 _handle_error(_lib.clingo_solve_handle_cancel(self._rep), self._handler) 482 483 def core(self) -> List[int]: 484 ''' 485 The subset of assumptions that made the problem unsatisfiable. 486 ''' 487 core, size = _c_call2('clingo_literal_t*', 'size_t', _lib.clingo_solve_handle_core, 488 self._rep, handler=self._handler) 489 return [core[i] for i in range(size)] 490 491 def get(self) -> SolveResult: 492 ''' 493 Get the result of a solve call. 494 495 If the search is not completed yet, the function blocks until the 496 result is ready. 497 ''' 498 res = _c_call('clingo_solve_result_bitset_t', _lib.clingo_solve_handle_get, self._rep, handler=self._handler) 499 return SolveResult(res) 500 501 def model(self) -> Optional[Model]: 502 ''' 503 Get the current model if there is any. 504 ''' 505 p_model = _ffi.new('clingo_model_t**') 506 _handle_error( 507 _lib.clingo_solve_handle_model(self._rep, p_model), 508 self._handler) 509 if p_model[0] == _ffi.NULL: 510 return None 511 return Model(p_model[0]) 512 513 def resume(self) -> None: 514 ''' 515 Discards the last model and starts searching for the next one. 516 517 Notes 518 ----- 519 If the search has been started asynchronously, this function starts the 520 search in the background. 521 ''' 522 _handle_error(_lib.clingo_solve_handle_resume(self._rep), self._handler) 523 524 def wait(self, timeout: Optional[float]=None) -> bool: 525 ''' 526 Wait for solve call to finish or the next result with an optional timeout. 527 528 If a timeout is given, the behavior of the function changes depending 529 on the sign of the timeout. If a postive timeout is given, the function 530 blocks for the given amount time or until a result is ready. If the 531 timeout is negative, the function will block until a result is ready, 532 which also corresponds to the behavior of the function if no timeout is 533 given. A timeout of zero can be used to poll if a result is ready. 534 535 Parameters 536 ---------- 537 timeout 538 If a timeout is given, the function blocks for at most timeout seconds. 539 540 Returns 541 ------- 542 Indicates whether the solve call has finished or the next result is ready. 543 ''' 544 p_res = _ffi.new('bool*') 545 _lib.clingo_solve_handle_wait(self._rep, -1 if timeout is None else timeout, p_res) 546 return p_res[0] 547