1"""Deals with the socket communication between the PIMD and driver code. 2 3Copyright (C) 2013, Joshua More and Michele Ceriotti 4 5This program is free software: you can redistribute it and/or modify 6it under the terms of the GNU General Public License as published by 7the Free Software Foundation, either version 3 of the License, or 8(at your option) any later version. 9 10This program is distributed in the hope that it will be useful, 11but WITHOUT ANY WARRANTY; without even the implied warranty of 12MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13GNU General Public License for more details. 14 15You should have received a copy of the GNU General Public License 16along with this program. If not, see <http.//www.gnu.org/licenses/>. 17 18 19Deals with creating the socket, transmitting and receiving data, accepting and 20removing different driver routines and the parallelization of the force 21calculation. 22 23Classes: 24 Status: Simple class to keep track of the status, uses bitwise or to give 25 combinations of different status options. 26 DriverSocket: Class to deal with communication between a client and 27 the driver code. 28 InterfaceSocket: Host server class. Deals with distribution of all the jobs 29 between the different client servers. 30 31Functions: 32 Message: Sends a header string through the socket. 33 34Exceptions: 35 Disconnected: Raised if client has been disconnected. 36 InvalidStatus: Raised if client has the wrong status. Shouldn't have to be 37 used if the structure of the program is correct. 38""" 39 40__all__ = ['InterfaceSocket'] 41 42import numpy as np 43import sys, os 44import socket, select, threading, signal, string, time 45from ipi.utils.depend import depstrip 46from ipi.utils.messages import verbosity, warning, info 47from ipi.utils.softexit import softexit 48 49 50HDRLEN = 12 51UPDATEFREQ = 10 52TIMEOUT = 5.0 53SERVERTIMEOUT = 2.0*TIMEOUT 54NTIMEOUT = 10 55 56def Message(mystr): 57 """Returns a header of standard length HDRLEN.""" 58 59 return string.ljust(string.upper(mystr), HDRLEN) 60 61 62class Disconnected(Exception): 63 """Disconnected: Raised if client has been disconnected.""" 64 65 pass 66 67class InvalidSize(Exception): 68 """Disconnected: Raised if client returns forces with inconsistent number of atoms.""" 69 70 pass 71 72class InvalidStatus(Exception): 73 """InvalidStatus: Raised if client has the wrong status. 74 75 Shouldn't have to be used if the structure of the program is correct. 76 """ 77 78 pass 79 80class Status: 81 """Simple class used to keep track of the status of the client. 82 83 Uses bitwise or to give combinations of different status options. 84 i.e. Status.Up | Status.Ready would be understood to mean that the client 85 was connected and ready to receive the position and cell data. 86 87 Attributes: 88 Disconnected: Flag for if the client has disconnected. 89 Up: Flag for if the client is running. 90 Ready: Flag for if the client has ready to receive position and cell data. 91 NeedsInit: Flag for if the client is ready to receive forcefield 92 parameters. 93 HasData: Flag for if the client is ready to send force data. 94 Busy: Flag for if the client is busy. 95 Timeout: Flag for if the connection has timed out. 96 """ 97 98 Disconnected = 0 99 Up = 1 100 Ready = 2 101 NeedsInit = 4 102 HasData = 8 103 Busy = 16 104 Timeout = 32 105 106 107class DriverSocket(socket.socket): 108 """Deals with communication between the client and driver code. 109 110 Deals with sending and receiving the data from the driver code. Keeps track 111 of the status of the driver. Initializes the driver forcefield, sends the 112 position and cell data, and receives the force data. 113 114 Attributes: 115 _buf: A string buffer to hold the reply from the driver. 116 status: Keeps track of the status of the driver. 117 lastreq: The ID of the last request processed by the client. 118 locked: Flag to mark if the client has been working consistently on one image. 119 """ 120 121 def __init__(self, socket): 122 """Initializes DriverSocket. 123 124 Args: 125 socket: A socket through which the communication should be done. 126 """ 127 128 super(DriverSocket,self).__init__(_sock=socket) 129 self._buf = np.zeros(0,np.byte) 130 self.peername = self.getpeername() 131 self.status = Status.Up 132 self.waitstatus = False 133 self.lastreq = None 134 self.locked = False 135 136 def shutdown(self, how=socket.SHUT_RDWR): 137 138 self.sendall(Message("exit")) 139 self.status = Status.Disconnected 140 super(DriverSocket,self).shutdown(how) 141 142 def poll(self): 143 """Waits for driver status.""" 144 145 self.status = Status.Disconnected # sets disconnected as failsafe status, in case _getstatus fails and exceptions are ignored upstream 146 self.status = self._getstatus() 147 148 def _getstatus(self): 149 """Gets driver status. 150 151 Returns: 152 An integer labelling the status via bitwise or of the relevant members 153 of Status. 154 """ 155 156 if not self.waitstatus: 157 try: 158 readable, writable, errored = select.select([], [self], []) 159 if self in writable: 160 self.sendall(Message("status")) 161 self.waitstatus = True 162 except: 163 return Status.Disconnected 164 165 try: 166 reply = self.recv(HDRLEN) 167 self.waitstatus = False # got status reply 168 except socket.timeout: 169 warning(" @SOCKET: Timeout in status recv!", verbosity.debug ) 170 return Status.Up | Status.Busy | Status.Timeout 171 except: 172 return Status.Disconnected 173 174 if not len(reply) == HDRLEN: 175 return Status.Disconnected 176 elif reply == Message("ready"): 177 return Status.Up | Status.Ready 178 elif reply == Message("needinit"): 179 return Status.Up | Status.NeedsInit 180 elif reply == Message("havedata"): 181 return Status.Up | Status.HasData 182 else: 183 warning(" @SOCKET: Unrecognized reply: " + str(reply), verbosity.low ) 184 return Status.Up 185 186 def recvall(self, dest): 187 """Gets the potential energy, force and virial from the driver. 188 189 Args: 190 dest: Object to be read into. 191 192 Raises: 193 Disconnected: Raised if client is disconnected. 194 195 Returns: 196 The data read from the socket to be read into dest. 197 """ 198 199 blen = dest.itemsize*dest.size 200 if (blen > len(self._buf)): 201 self._buf.resize(blen) 202 bpos = 0 203 ntimeout = 0 204 205 while bpos < blen: 206 timeout = False 207 208# pre-2.5 version. 209 try: 210 bpart = "" 211 bpart = self.recv(blen - bpos) 212 if len(bpart) == 0: raise socket.timeout # There is a problem if this returns no data 213 self._buf[bpos:bpos + len(bpart)] = np.fromstring(bpart, np.byte) 214 except socket.timeout: 215 warning(" @SOCKET: Timeout in status recvall, trying again!", verbosity.low) 216 timeout = True 217 ntimeout += 1 218 if ntimeout > NTIMEOUT: 219 warning(" @SOCKET: Couldn't receive within %5d attempts. Time to give up!" % (NTIMEOUT), verbosity.low) 220 raise Disconnected() 221 pass 222 if (not timeout and bpart == 0): 223 raise Disconnected() 224 bpos += len(bpart) 225 226# post-2.5 version: slightly more compact for modern python versions 227# try: 228# bpart = 1 229# bpart = self.recv_into(self._buf[bpos:], blen-bpos) 230# except socket.timeout: 231# print " @SOCKET: Timeout in status recvall, trying again!" 232# timeout = True 233# pass 234# if (not timeout and bpart == 0): 235# raise Disconnected() 236# bpos += bpart 237#TODO this Disconnected() exception currently just causes the program to hang. 238#This should do something more graceful 239 240 if np.isscalar(dest): 241 return np.fromstring(self._buf[0:blen], dest.dtype)[0] 242 else: 243 return np.fromstring(self._buf[0:blen], dest.dtype).reshape(dest.shape) 244 245 def initialize(self, rid, pars): 246 """Sends the initialization string to the driver. 247 248 Args: 249 rid: The index of the request, i.e. the replica that 250 the force calculation is for. 251 pars: The parameter string to be sent to the driver. 252 253 Raises: 254 InvalidStatus: Raised if the status is not NeedsInit. 255 """ 256 257 if self.status & Status.NeedsInit: 258 try: 259 self.sendall(Message("init")) 260 self.sendall(np.int32(rid)) 261 self.sendall(np.int32(len(pars))) 262 self.sendall(pars) 263 except: 264 self.poll() 265 return 266 else: 267 raise InvalidStatus("Status in init was " + self.status) 268 269 def sendpos(self, pos, cell): 270 """Sends the position and cell data to the driver. 271 272 Args: 273 pos: An array containing the atom positions. 274 cell: A cell object giving the system box. 275 276 Raises: 277 InvalidStatus: Raised if the status is not Ready. 278 """ 279 280 if (self.status & Status.Ready): 281 try: 282 self.sendall(Message("posdata")) 283 self.sendall(cell.h, 9*8) 284 self.sendall(cell.ih, 9*8) 285 self.sendall(np.int32(len(pos)/3)) 286 self.sendall(pos, len(pos)*8) 287 except: 288 self.poll() 289 return 290 else: 291 raise InvalidStatus("Status in sendpos was " + self.status) 292 293 def getforce(self): 294 """Gets the potential energy, force and virial from the driver. 295 296 Raises: 297 InvalidStatus: Raised if the status is not HasData. 298 Disconnected: Raised if the driver has disconnected. 299 300 Returns: 301 A list of the form [potential, force, virial, extra]. 302 """ 303 304 if (self.status & Status.HasData): 305 self.sendall(Message("getforce")); 306 reply = "" 307 while True: 308 try: 309 reply = self.recv(HDRLEN) 310 except socket.timeout: 311 warning(" @SOCKET: Timeout in getforce, trying again!", verbosity.low) 312 continue 313 if reply == Message("forceready"): 314 break 315 else: 316 warning(" @SOCKET: Unexpected getforce reply: %s" % (reply), verbosity.low) 317 if reply == "": 318 raise Disconnected() 319 else: 320 raise InvalidStatus("Status in getforce was " + self.status) 321 322 mu = np.float64() 323 mu = self.recvall(mu) 324 325 mlen = np.int32() 326 mlen = self.recvall(mlen) 327 mf = np.zeros(3*mlen,np.float64) 328 mf = self.recvall(mf) 329 330 mvir = np.zeros((3,3),np.float64) 331 mvir = self.recvall(mvir) 332 333 #! Machinery to return a string as an "extra" field. Comment if you are using a old patched driver that does not return anything! 334 mlen = np.int32() 335 mlen = self.recvall(mlen) 336 if mlen > 0 : 337 mxtra = np.zeros(mlen,np.character) 338 mxtra = self.recvall(mxtra) 339 mxtra = "".join(mxtra) 340 else: 341 mxtra = "" 342 343 #!TODO must set up a machinery to intercept the "extra" return field 344 return [mu, mf, mvir, mxtra] 345 346 347class InterfaceSocket(object): 348 """Host server class. 349 350 Deals with distribution of all the jobs between the different client servers 351 and both initially and as clients either finish or are disconnected. 352 Deals with cleaning up after all calculations are done. Also deals with the 353 threading mechanism, and cleaning up if the interface is killed. 354 355 Attributes: 356 address: A string giving the name of the host network. 357 port: An integer giving the port the socket will be using. 358 slots: An integer giving the maximum allowed backlog of queued clients. 359 mode: A string giving the type of socket used. 360 latency: A float giving the number of seconds the interface will wait 361 before updating the client list. 362 timeout: A float giving a timeout limit for considering a calculation dead 363 and dropping the connection. 364 dopbc: A boolean which decides whether or not to fold the bead positions 365 back into the unit cell before passing them to the client code. 366 server: The socket used for data transmition. 367 clients: A list of the driver clients connected to the server. 368 requests: A list of all the jobs required in the current PIMD step. 369 jobs: A list of all the jobs currently running. 370 _poll_thread: The thread the poll loop is running on. 371 _prev_kill: Holds the signals to be sent to clean up the main thread 372 when a kill signal is sent. 373 _poll_true: A boolean giving whether the thread is alive. 374 _poll_iter: An integer used to decide whether or not to check for 375 client connections. It is used as a counter, once it becomes higher 376 than the pre-defined number of steps between checks the socket will 377 update the list of clients and then be reset to zero. 378 """ 379 380 def __init__(self, address="localhost", port=31415, slots=4, mode="unix", latency=1e-3, timeout=1.0, dopbc=True): 381 """Initializes interface. 382 383 Args: 384 address: An optional string giving the name of the host server. 385 Defaults to 'localhost'. 386 port: An optional integer giving the port number. Defaults to 31415. 387 slots: An optional integer giving the maximum allowed backlog of 388 queueing clients. Defaults to 4. 389 mode: An optional string giving the type of socket. Defaults to 'unix'. 390 latency: An optional float giving the time in seconds the socket will 391 wait before updating the client list. Defaults to 1e-3. 392 timeout: Length of time waiting for data from a client before we assume 393 the connection is dead and disconnect the client. 394 dopbc: A boolean which decides whether or not to fold the bead positions 395 back into the unit cell before passing them to the client code. 396 397 Raises: 398 NameError: Raised if mode is not 'unix' or 'inet'. 399 """ 400 401 self.address = address 402 self.port = port 403 self.slots = slots 404 self.mode = mode 405 self.latency = latency 406 self.timeout = timeout 407 self.dopbc = dopbc 408 self._poll_thread = None 409 self._prev_kill = {} 410 self._poll_true = False 411 self._poll_iter = 0 412 413 def open(self): 414 """Creates a new socket. 415 416 Used so that we can create a interface object without having to also 417 create the associated socket object. 418 """ 419 420 if self.mode == "unix": 421 self.server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 422 try: 423 self.server.bind("/tmp/ipi_" + self.address) 424 info("Created unix socket with address " + self.address, verbosity.medium) 425 except: 426 raise ValueError("Error opening unix socket. Check if a file " + ("/tmp/ipi_" + self.address) + " exists, and remove it if unused.") 427 428 elif self.mode == "inet": 429 self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 430 self.server.bind((self.address,self.port)) 431 info("Created inet socket with address " + self.address + " and port number " + str(self.port), verbosity.medium) 432 else: 433 raise NameError("InterfaceSocket mode " + self.mode + " is not implemented (should be unix/inet)") 434 435 self.server.listen(self.slots) 436 self.server.settimeout(SERVERTIMEOUT) 437 self.clients = [] 438 self.requests = [] 439 self.jobs = [] 440 441 def close(self): 442 """Closes down the socket.""" 443 444 info(" @SOCKET: Shutting down the driver interface.", verbosity.low ) 445 446 for c in self.clients[:]: 447 if (c.status & Status.Up): 448 c.shutdown(socket.SHUT_RDWR) 449 450 self.server.shutdown(socket.SHUT_RDWR) 451 self.server.close() 452 if self.mode == "unix": 453 os.unlink("/tmp/ipi_" + self.address) 454 455 def queue(self, atoms, cell, pars=None, reqid=0): 456 """Adds a request. 457 458 Note that the pars dictionary need to be sent as a string of a 459 standard format so that the initialization of the driver can be done. 460 461 Args: 462 atoms: An Atoms object giving the atom positions. 463 cell: A Cell object giving the system box. 464 pars: An optional dictionary giving the parameters to be sent to the 465 driver for initialization. Defaults to {}. 466 reqid: An optional integer that identifies requests of the same type, 467 e.g. the bead index 468 469 Returns: 470 A list giving the status of the request of the form {'atoms': Atoms 471 object giving the atom positions, 'cell': Cell object giving the 472 system box, 'pars': parameter string, 'result': holds the result as a 473 list once the computation is done, 'status': a string labelling the 474 status, 'id': the id of the request, usually the bead number, 'start': 475 the starting time for the calculation, used to check for timeouts.}. 476 """ 477 478 par_str = " " 479 480 if not pars is None: 481 for k,v in pars.items(): 482 par_str += k + " : " + str(v) + " , " 483 else: 484 par_str = " " 485 486 # APPLY PBC -- this is useful for codes such as LAMMPS that don't do full PBC when computing distances 487 pbcpos = depstrip(atoms.q).copy() 488 if self.dopbc: 489 cell.array_pbc(pbcpos) 490 491 newreq = {"pos": pbcpos, "cell": cell, "pars": par_str, 492 "result": None, "status": "Queued", "id": reqid, 493 "start": -1 } 494 495 self.requests.append(newreq) 496 return newreq 497 498 def release(self, request): 499 """Empties the list of requests once finished. 500 501 Args: 502 request: A list of requests that are done. 503 """ 504 505 if request in self.requests: 506 self.requests.remove(request) 507 508 def pool_update(self): 509 """Deals with keeping the pool of client drivers up-to-date during a 510 force calculation step. 511 512 Deals with maintaining the client list. Clients that have 513 disconnected are removed and their jobs removed from the list of 514 running jobs and new clients are connected to the server. 515 """ 516 517 for c in self.clients[:]: 518 if not (c.status & Status.Up): 519 try: 520 warning(" @SOCKET: Client " + str(c.peername) +" died or got unresponsive(C). Removing from the list.", verbosity.low) 521 c.shutdown(socket.SHUT_RDWR) 522 c.close() 523 except: 524 pass 525 c.status = Status.Disconnected 526 self.clients.remove(c) 527 for [k,j] in self.jobs[:]: 528 if j is c: 529 self.jobs = [ w for w in self.jobs if not ( w[0] is k and w[1] is j ) ] # removes pair in a robust way 530 #self.jobs.remove([k,j]) 531 k["status"] = "Queued" 532 k["start"] = -1 533 534 keepsearch = True 535 while keepsearch: 536 readable, writable, errored = select.select([self.server], [], [], 0.0) 537 if self.server in readable: 538 client, address = self.server.accept() 539 client.settimeout(TIMEOUT) 540 driver = DriverSocket(client) 541 info(" @SOCKET: Client asked for connection from "+ str( address ) +". Now hand-shaking.", verbosity.low) 542 driver.poll() 543 if (driver.status | Status.Up): 544 self.clients.append(driver) 545 info(" @SOCKET: Handshaking was successful. Added to the client list.", verbosity.low) 546 else: 547 warning(" @SOCKET: Handshaking failed. Dropping connection.", verbosity.low) 548 client.shutdown(socket.SHUT_RDWR) 549 client.close() 550 else: 551 keepsearch = False 552 553 def pool_distribute(self): 554 """Deals with keeping the list of jobs up-to-date during a force 555 calculation step. 556 557 Deals with maintaining the jobs list. Gets data from drivers that have 558 finished their calculation and removes that job from the list of running 559 jobs, adds jobs to free clients and initializes the forcefields of new 560 clients. 561 """ 562 563 for c in self.clients: 564 if c.status == Status.Disconnected : # client disconnected. force a pool_update 565 self._poll_iter = UPDATEFREQ 566 return 567 if not c.status & ( Status.Ready | Status.NeedsInit ): 568 c.poll() 569 570 for [r,c] in self.jobs[:]: 571 if c.status & Status.HasData: 572 try: 573 r["result"] = c.getforce() 574 if len(r["result"][1]) != len(r["pos"]): 575 raise InvalidSize 576 except Disconnected: 577 c.status = Status.Disconnected 578 continue 579 except InvalidSize: 580 warning(" @SOCKET: Client returned an inconsistent number of forces. Will mark as disconnected and try to carry on.", verbosity.low) 581 c.status = Status.Disconnected 582 continue 583 except: 584 warning(" @SOCKET: Client got in a awkward state during getforce. Will mark as disconnected and try to carry on.", verbosity.low) 585 c.status = Status.Disconnected 586 continue 587 c.poll() 588 while c.status & Status.Busy: # waits, but check if we got stuck. 589 if self.timeout > 0 and r["start"] > 0 and time.time() - r["start"] > self.timeout: 590 warning(" @SOCKET: Timeout! HASDATA for bead " + str(r["id"]) + " has been running for " + str(time.time() - r["start"]) + " sec.", verbosity.low) 591 warning(" @SOCKET: Client " + str(c.peername) + " died or got unresponsive(A). Disconnecting.", verbosity.low) 592 try: 593 c.shutdown(socket.SHUT_RDWR) 594 except: 595 pass 596 c.close() 597 c.status = Status.Disconnected 598 continue 599 c.poll() 600 if not (c.status & Status.Up): 601 warning(" @SOCKET: Client died a horrible death while getting forces. Will try to cleanup.", verbosity.low) 602 continue 603 r["status"] = "Done" 604 c.lastreq = r["id"] # saves the ID of the request that the client has just processed 605 self.jobs = [ w for w in self.jobs if not ( w[0] is r and w[1] is c ) ] # removes pair in a robust way 606 607 if self.timeout > 0 and c.status != Status.Disconnected and r["start"] > 0 and time.time() - r["start"] > self.timeout: 608 warning(" @SOCKET: Timeout! Request for bead " + str( r["id"]) + " has been running for " + str(time.time() - r["start"]) + " sec.", verbosity.low) 609 warning(" @SOCKET: Client " + str(c.peername) + " died or got unresponsive(B). Disconnecting.",verbosity.low) 610 try: 611 c.shutdown(socket.SHUT_RDWR) 612 except socket.error: 613 e = sys.exc_info() 614 warning(" @SOCKET: could not shut down cleanly the socket. %s: %s in file '%s' on line %d" % (e[0].__name__, e[1], os.path.basename(e[2].tb_frame.f_code.co_filename), e[2].tb_lineno), verbosity.low ) 615 c.close() 616 c.poll() 617 c.status = Status.Disconnected 618 619 freec = self.clients[:] 620 for [r2, c] in self.jobs: 621 freec.remove(c) 622 623 pendr = self.requests[:] 624 pendr = [ r for r in self.requests if r["status"] == "Queued" ] 625 626 for fc in freec[:]: 627 matched = False 628 # first, makes sure that the client is REALLY free 629 if not (fc.status & Status.Up): 630 self.clients.remove(fc) # if fc is in freec it can't be associated with a job (we just checked for that above) 631 continue 632 if fc.status & Status.HasData: 633 continue 634 if not (fc.status & (Status.Ready | Status.NeedsInit | Status.Busy) ): 635 warning(" @SOCKET: Client " + str(fc.peername) + " is in an unexpected status " + str(fc.status) + " at (1). Will try to keep calm and carry on.", verbosity.low) 636 continue 637 for match_ids in ( "match", "none", "free", "any" ): 638 for r in pendr[:]: 639 if match_ids == "match" and not fc.lastreq is r["id"]: 640 continue 641 elif match_ids == "none" and not fc.lastreq is None: 642 continue 643 elif match_ids == "free" and fc.locked: 644 continue 645 646 info(" @SOCKET: Assigning [%5s] request id %4s to client with last-id %4s (% 3d/% 3d : %s)" % (match_ids, str(r["id"]), str(fc.lastreq), self.clients.index(fc), len(self.clients), str(fc.peername) ), verbosity.high ) 647 648 while fc.status & Status.Busy: 649 fc.poll() 650 if fc.status & Status.NeedsInit: 651 fc.initialize(r["id"], r["pars"]) 652 fc.poll() 653 while fc.status & Status.Busy: # waits for initialization to finish. hopefully this is fast 654 fc.poll() 655 if fc.status & Status.Ready: 656 fc.sendpos(r["pos"], r["cell"]) 657 r["status"] = "Running" 658 r["start"] = time.time() # sets start time for the request 659 fc.poll() 660 self.jobs.append([r,fc]) 661 fc.locked = (fc.lastreq is r["id"]) 662 matched = True 663 # removes r from the list of pending jobs 664 pendr = [nr for nr in pendr if (not nr is r)] 665 break 666 else: 667 warning(" @SOCKET: Client " + str(fc.peername) + " is in an unexpected status " + str(fc.status) + " at (2). Will try to keep calm and carry on.", verbosity.low) 668 if matched: 669 break # doesn't do a second (or third) round if it managed 670 # to assign the job 671 672 def _kill_handler(self, signal, frame): 673 """Deals with handling a kill call gracefully. 674 675 Prevents any of the threads becoming zombies, by intercepting a 676 kill signal using the standard python function signal.signal() and 677 then closing the socket and the spawned threads before closing the main 678 thread. Called when signals SIG_INT and SIG_TERM are received. 679 680 Args: 681 signal: An integer giving the signal number of the signal received 682 from the socket. 683 frame: Current stack frame. 684 """ 685 686 warning(" @SOCKET: Kill signal. Trying to make a clean exit.", verbosity.low) 687 self.end_thread() 688 689 softexit.trigger(" @SOCKET: Kill signal received") 690 691 try: 692 self.__del__() 693 except: 694 pass 695 if signal in self._prev_kill: 696 self._prev_kill[signal](signal, frame) 697 698 def _poll_loop(self): 699 """The main thread loop. 700 701 Runs until either the program finishes or a kill call is sent. Updates 702 the pool of clients every UPDATEFREQ loops and loops every latency 703 seconds until _poll_true becomes false. 704 """ 705 706 info(" @SOCKET: Starting the polling thread main loop.", verbosity.low) 707 self._poll_iter = UPDATEFREQ 708 while self._poll_true: 709 time.sleep(self.latency) 710 # makes sure to remove the last dead client as soon as possible -- and to get clients if we are dry 711 if self._poll_iter >= UPDATEFREQ or len(self.clients)==0 or (len(self.clients) > 0 and not(self.clients[0].status & Status.Up)): 712 self.pool_update() 713 self._poll_iter = 0 714 self._poll_iter += 1 715 self.pool_distribute() 716 717 if os.path.exists("EXIT"): # softexit 718 info(" @SOCKET: Soft exit request from file EXIT. Flushing job queue.", verbosity.low) 719 # releases all pending requests 720 for r in self.requests: 721 r["status"] = "Exit" 722 for c in self.clients: 723 try: 724 c.shutdown(socket.SHUT_RDWR) 725 c.close() 726 except: 727 pass 728 # flush it all down the drain 729 self.clients = [] 730 self.jobs = [] 731 self._poll_thread = None 732 733 def started(self): 734 """Returns a boolean specifying whether the thread has started yet.""" 735 736 return (not self._poll_thread is None) 737 738 def start_thread(self): 739 """Spawns a new thread. 740 741 Splits the main program into two threads, one that runs the polling loop 742 which updates the client list, and one which gets the data. Also sets up 743 the machinery to deal with a kill call, in the case of a Ctrl-C or 744 similar signal the signal is intercepted by the _kill_handler function, 745 which cleans up the spawned thread before closing the main thread. 746 747 Raises: 748 NameError: Raised if the polling thread already exists. 749 """ 750 751 self.open() 752 if not self._poll_thread is None: 753 raise NameError("Polling thread already started") 754 self._poll_thread = threading.Thread(target=self._poll_loop, name="poll_" + self.address) 755 self._poll_thread.daemon = True 756 self._prev_kill[signal.SIGINT] = signal.signal(signal.SIGINT, self._kill_handler) 757 self._prev_kill[signal.SIGTERM] = signal.signal(signal.SIGTERM, self._kill_handler) 758 self._poll_true = True 759 self._poll_thread.start() 760 761 def end_thread(self): 762 """Closes the spawned thread. 763 764 Deals with cleaning up the spawned thread cleanly. First sets 765 _poll_true to false to indicate that the poll_loop should be exited, then 766 closes the spawned thread and removes it. 767 """ 768 769 self._poll_true = False 770 if not self._poll_thread is None: 771 self._poll_thread.join() 772 self._poll_thread = None 773 self.close() 774