1# -*- coding: utf-8 -*- 2''' 3stacking.py raet protocol stacking classes 4''' 5# pylint: skip-file 6# pylint: disable=W0611 7 8# Import python libs 9import socket 10import os 11import errno 12import sys 13if sys.version_info > (3,): 14 long = int 15 16from collections import deque, Mapping 17try: 18 import simplejson as json 19except ImportError: 20 import json 21 22try: 23 import msgpack 24except ImportError: 25 mspack = None 26 27# Import ioflo libs 28from ioflo.aid.odicting import odict 29from ioflo.aid.timing import StoreTimer 30from ioflo.base.storing import Store 31 32# Import raet libs 33from .abiding import * # import globals 34from . import raeting 35from . import keeping 36from . import lotting 37 38from ioflo.base.consoling import getConsole 39console = getConsole() 40 41class Stack(object): 42 ''' 43 RAET protocol base stack object. 44 Should be subclassed for specific transport type such as UDP or UXD 45 ''' 46 Count = 0 47 Uid = 0 # base for next unique id for local and remotes 48 49 def __init__(self, 50 store=None, 51 version=raeting.VERSION, 52 main=None, 53 puid=None, 54 local=None, #passed up from subclass 55 name='', 56 uid=None, 57 server=None, 58 ha=None, 59 bufcnt=2, 60 rxMsgs=None, 61 txMsgs=None, 62 rxes=None, 63 txes=None, 64 stats=None, 65 ): 66 ''' 67 Setup Stack instance 68 ''' 69 self.store = store or Store(stamp=0.0) 70 71 self.version = version 72 self.main = main 73 74 if getattr(self, 'puid', None) is None: 75 self.puid = puid if puid is not None else self.Uid 76 77 self.local = local or lotting.Lot(stack=self, 78 name=name, 79 uid=uid, 80 ha=ha,) 81 self.local.stack = self 82 83 self.remotes = self.uidRemotes = odict() # remotes indexed by uid 84 self.nameRemotes = odict() # remotes indexed by name 85 86 self.bufcnt = bufcnt 87 if not server: 88 server = self.serverFromLocal() 89 90 self.server = server 91 if self.server: 92 if not self.server.reopen(): # open socket 93 raise raeting.StackError("Stack '{0}': Failed opening server at" 94 " '{1}'\n".format(self.name, self.server.ha)) 95 96 self.ha = self.server.ha # update local host address after open 97 98 console.verbose("Stack '{0}': Opened server at '{1}'\n".format(self.name, 99 self.ha)) 100 101 self.rxMsgs = rxMsgs if rxMsgs is not None else deque() # messages received 102 self.txMsgs = txMsgs if txMsgs is not None else deque() # messages to transmit 103 self.rxes = rxes if rxes is not None else deque() # udp packets received 104 self.txes = txes if txes is not None else deque() # udp packet to transmit 105 self.stats = stats if stats is not None else odict() # udp statistics 106 self.statTimer = StoreTimer(self.store) 107 108 @property 109 def name(self): 110 ''' 111 property that returns name of local interface 112 ''' 113 return self.local.name 114 115 @name.setter 116 def name(self, value): 117 ''' 118 setter for name property 119 ''' 120 self.local.name = value 121 122 @property 123 def ha(self): 124 ''' 125 property that returns host address 126 ''' 127 return self.local.ha 128 129 @ha.setter 130 def ha(self, value): 131 self.local.ha = value 132 133 def serverFromLocal(self): 134 ''' 135 Create server from local data 136 ''' 137 return None 138 139 def nextUid(self): 140 ''' 141 Generates next unique id number for local or remotes. 142 ''' 143 self.puid += 1 144 if self.puid > long(0xffffffff): 145 self.puid = 1 # rollover to 1 146 return self.puid 147 148 def addRemote(self, remote): 149 ''' 150 Add a remote to indexes 151 ''' 152 uid = remote.uid 153 # allow for condition where local.uid == 0 and remote.uid == 0 154 if uid in self.uidRemotes or (uid and uid == self.local.uid): 155 emsg = "Cannot add remote at uid '{0}', alreadys exists".format(uid) 156 raise raeting.StackError(emsg) 157 if remote.name in self.nameRemotes or remote.name == self.local.name: 158 emsg = "Cannot add remote at name '{0}', alreadys exists".format(remote.name) 159 raise raeting.StackError(emsg) 160 remote.stack = self 161 self.uidRemotes[uid] = remote 162 self.nameRemotes[remote.name] = remote 163 return remote 164 165 def moveRemote(self, remote, new): 166 ''' 167 Move remote at key remote.uid to new uid and replace the odict key index 168 so order is the same 169 ''' 170 old = remote.uid 171 172 if new in self.uidRemotes or new == self.local.uid: 173 emsg = "Cannot move, remote to '{0}', already exists".format(new) 174 raise raeting.StackError(emsg) 175 176 if old not in self.uidRemotes: 177 emsg = "Cannot move remote at '{0}', does not exist".format(old) 178 raise raeting.StackError(emsg) 179 180 if remote is not self.uidRemotes[old]: 181 emsg = "Cannot move remote at '{0}', not identical".format(old) 182 raise raeting.StackError(emsg) 183 184 remote.uid = new 185 index = self.uidRemotes.keys().index(old) 186 del self.uidRemotes[old] 187 self.uidRemotes.insert(index, new, remote) 188 189 def renameRemote(self, remote, new): 190 ''' 191 rename remote with old remote.name to new name but keep same index 192 ''' 193 old = remote.name 194 if new != old: 195 if new in self.nameRemotes or new == self.local.name: 196 emsg = "Cannot rename remote to '{0}', already exists".format(new) 197 raise raeting.StackError(emsg) 198 199 if old not in self.nameRemotes: 200 emsg = "Cannot rename remote '{0}', does not exist".format(old) 201 raise raeting.StackError(emsg) 202 203 if remote is not self.nameRemotes[old]: 204 emsg = "Cannot rename remote '{0}', not identical".format(old) 205 raise raeting.StackError(emsg) 206 207 remote.name = new 208 index = self.nameRemotes.keys().index(old) 209 del self.nameRemotes[old] 210 self.nameRemotes.insert(index, new, remote) 211 212 def removeRemote(self, remote): 213 ''' 214 Remove remote from all remotes dicts 215 ''' 216 uid = remote.uid 217 if uid not in self.uidRemotes: 218 emsg = "Cannot remove remote '{0}', does not exist".format(uid) 219 raise raeting.StackError(emsg) 220 221 if remote is not self.uidRemotes[uid]: 222 emsg = "Cannot remove remote '{0}', not identical".format(uid) 223 raise raeting.StackError(emsg) 224 225 del self.uidRemotes[uid] 226 del self.nameRemotes[remote.name] 227 228 def removeAllRemotes(self): 229 ''' 230 Remove all the remotes 231 ''' 232 remotes = self.remotes.values() #make copy since changing .remotes in-place 233 for remote in remotes: 234 self.removeRemote(remote) 235 236 def fetchUidByName(self, name): 237 ''' 238 Search for remote with matching name 239 Return remote if found Otherwise return None 240 ''' 241 remote = self.nameRemotes.get(name) 242 return (remote.uid if remote else None) 243 244 def incStat(self, key, delta=1): 245 ''' 246 Increment stat key counter by delta 247 ''' 248 if key in self.stats: 249 self.stats[key] += delta 250 else: 251 self.stats[key] = delta 252 253 def updateStat(self, key, value): 254 ''' 255 Set stat key to value 256 ''' 257 self.stats[key] = value 258 259 def clearStat(self, key): 260 ''' 261 Set the specified state counter to zero 262 ''' 263 if key in self.stats: 264 self.stats[key] = 0 265 266 def clearStats(self): 267 ''' 268 Set all the stat counters to zero and reset the timer 269 ''' 270 for key, value in self.stats.items(): 271 self.stats[key] = 0 272 self.statTimer.restart() 273 274 def _handleOneReceived(self): 275 ''' 276 Handle one received message from server 277 assumes that there is a server 278 ''' 279 try: 280 rx, ra = self.server.receive() # if no data the duple is ('',None) 281 except socket.error as ex: 282 err = raeting.get_exception_error(ex) 283 if err == errno.ECONNRESET: 284 return False 285 if not rx: # no received data 286 return False 287 self.rxes.append((rx, ra)) # duple = ( packet, source address) 288 return True 289 290 def serviceReceives(self): 291 ''' 292 Retrieve from server all recieved and put on the rxes deque 293 ''' 294 if self.server: 295 while self._handleOneReceived(): 296 pass 297 298 def serviceReceiveOnce(self): 299 ''' 300 Retrieve from server one recieved and put on the rxes deque 301 ''' 302 if self.server: 303 self._handleOneReceived() 304 305 def _handleOneRx(self): 306 ''' 307 Handle on message from .rxes deque 308 Assumes that there is a message on the .rxes deque 309 ''' 310 raw, sa = self.rxes.popleft() 311 console.verbose("{0} received raw message\n{1}\n".format(self.name, raw)) 312 processRx(packet=raw) 313 314 def serviceRxes(self): 315 ''' 316 Process all messages in .rxes deque 317 ''' 318 while self.rxes: 319 self._handleOneRx() 320 321 def serviceRxOnce(self): 322 ''' 323 Process one messages in .rxes deque 324 ''' 325 if self.rxes: 326 self._handleOneRx() 327 328 def processRx(self, packet): 329 ''' 330 Process 331 ''' 332 pass 333 334 def transmit(self, msg, uid=None): 335 ''' 336 Append duple (msg, uid) to .txMsgs deque 337 If msg is not mapping then raises exception 338 If uid is None then it will default to the first entry in .remotes 339 ''' 340 if not isinstance(msg, Mapping): 341 emsg = "Invalid msg, not a mapping {0}\n".format(msg) 342 console.terse(emsg) 343 self.incStat("invalid_transmit_body") 344 return 345 if uid is None: 346 if not self.remotes: 347 emsg = "No remote to send to\n" 348 console.terse(emsg) 349 self.incStat("invalid_destination") 350 return 351 uid = self.remotes.values()[0].uid 352 self.txMsgs.append((msg, uid)) 353 354 def _handleOneTxMsg(self): 355 ''' 356 Take one message from .txMsgs deque and handle it 357 Assumes there is a message on the deque 358 ''' 359 body, uid = self.txMsgs.popleft() # duple (body dict, destination uid 360 self.message(body, uid=uid) 361 console.verbose("{0} sending\n{1}\n".format(self.name, body)) 362 363 def serviceTxMsgs(self): 364 ''' 365 Service .txMsgs queue of outgoing messages 366 ''' 367 while self.txMsgs: 368 self._handleOneTxMsg() 369 370 def serviceTxMsgOnce(self): 371 ''' 372 Service one message on .txMsgs queue of outgoing messages 373 ''' 374 if self.txMsgs: 375 self._handleOneTxMsg() 376 377 def message(self, body, uid=None): 378 ''' 379 Sends message body to remote at uid 380 ''' 381 pass 382 383 def tx(self, packed, duid): 384 ''' 385 Queue duple of (packed, da) on stack .txes queue 386 Where da is the ip destination (host,port) address associated with 387 the remote identified by duid 388 ''' 389 if duid not in self.remotes: 390 msg = "Invalid destination remote id '{0}'".format(duid) 391 raise raeting.StackError(msg) 392 self.txes.append((packed, self.remotes[duid].ha)) 393 394 def _handleOneTx(self, laters, blocks): 395 ''' 396 Handle one message on .txes deque 397 Assumes there is a message 398 laters is deque of messages to try again later 399 blocks is list of destinations that already blocked on this service 400 ''' 401 tx, ta = self.txes.popleft() # duple = (packet, destination address) 402 403 if ta in blocks: # already blocked on this iteration 404 laters.append((tx, ta)) # keep sequential 405 return 406 407 try: 408 self.server.send(tx, ta) 409 except socket.error as ex: 410 err = raeting.get_exception_error(ex) 411 errors = [errno.EAGAIN, 412 errno.EWOULDBLOCK, 413 errno.ENETUNREACH, 414 errno.EHOSTUNREACH, 415 errno.EHOSTDOWN, 416 errno.ECONNRESET] 417 if hasattr(errno, 'ETIME'): 418 errors.append(errno.ETIME) 419 if (err in errors): 420 # problem sending such as busy with last message. save it for later 421 laters.append((tx, ta)) 422 blocks.append(ta) 423 else: 424 raise 425 426 def serviceTxes(self): 427 ''' 428 Service the .txes deque to send messages through server 429 ''' 430 if self.server: 431 laters = deque() 432 blocks = [] 433 while self.txes: 434 self._handleOneTx(laters, blocks) 435 while laters: 436 self.txes.append(laters.popleft()) 437 438 def serviceTxOnce(self): 439 ''' 440 Service on message on the .txes deque to send through server 441 ''' 442 if self.server: 443 laters = deque() 444 blocks = [] # will always be empty since only once 445 if self.txes: 446 self._handleOneTx(laters, blocks) 447 while laters: 448 self.txes.append(laters.popleft()) 449 450 def serviceAllRx(self): 451 ''' 452 Service: 453 server receive 454 rxes queue 455 process 456 ''' 457 self.serviceReceives() 458 self.serviceRxes() 459 self.process() 460 461 def serviceAllTx(self): 462 ''' 463 Service: 464 txMsgs queue 465 txes queue to server send 466 ''' 467 self.serviceTxMsgs() 468 self.serviceTxes() 469 470 def serviceAll(self): 471 ''' 472 Service or Process: 473 server receive 474 rxes queue 475 process 476 txMsgs queue 477 txes queue to server send 478 ''' 479 self.serviceAllRx() 480 self.serviceAllTx() 481 482 def serviceServer(self): 483 ''' 484 Service the server's receive and transmit queues 485 ''' 486 self.serviceReceives() 487 self.serviceTxes() 488 489 def serviceOneAllRx(self): 490 ''' 491 Propagate one packet all the way through the received side of the stack 492 Service: 493 server receive 494 rxes queue 495 process 496 ''' 497 self.serviceReceiveOnce() 498 self.serviceRxOnce() 499 self.process() 500 501 def serviceOneAllTx(self): 502 ''' 503 Propagate one packet all the way through the transmit side of the stack 504 Service: 505 txMsgs queue 506 txes queue to server send 507 ''' 508 self.serviceTxMsgOnce() 509 self.serviceTxOnce() 510 511 def process(self): 512 ''' 513 Allow timer based processing 514 ''' 515 pass 516 517class KeepStack(Stack): 518 ''' 519 RAET protocol base stack object with persistance via Keep attribute. 520 Should be subclassed for specific transport type 521 ''' 522 Count = 0 523 Uid = 0 524 525 def __init__(self, 526 puid=None, 527 clean=False, 528 cleanlocal=False, 529 cleanremote=False, 530 keep=None, 531 dirpath='', 532 basedirpath='', 533 local=None, #passed up from subclass 534 name='', 535 uid=None, 536 ha=None, 537 **kwa 538 ): 539 ''' 540 Setup Stack instance 541 ''' 542 if getattr(self, 'puid', None) is None: 543 self.puid = puid if puid is not None else self.Uid 544 545 self.keep = keep or keeping.LotKeep(dirpath=dirpath, 546 basedirpath=basedirpath, 547 stackname=name) 548 549 if clean or cleanlocal: # clear persisted data so use provided or default data 550 self.clearLocalKeep() 551 552 local = self.restoreLocal() or local or lotting.Lot(stack=self, 553 name=name, 554 uid=uid, 555 ha=ha, 556 ) 557 local.stack = self 558 559 super(KeepStack, self).__init__(puid=puid, 560 local=local, 561 **kwa) 562 563 if clean or cleanremote: 564 self.clearRemoteKeeps() 565 self.restoreRemotes() # load remotes from saved data 566 567 for remote in self.remotes.values(): 568 remote.nextSid() 569 570 self.dumpLocal() # save local data 571 self.dumpRemotes() # save remote data 572 573 def addRemote(self, remote, dump=False): 574 ''' 575 Add a remote to .remotes 576 ''' 577 super(KeepStack, self).addRemote(remote=remote) 578 if dump: 579 self.dumpRemote(remote) 580 return remote 581 582 def moveRemote(self, remote, new, clear=False, dump=False): 583 ''' 584 Move remote with key remote.uid old to key new uid and replace 585 the odict key index so the order is the same. 586 If clear then clear the keep file for remote at old 587 If dump then dump the keep file for the remote at new 588 ''' 589 #old = remote.uid 590 super(KeepStack, self).moveRemote(remote, new=new) 591 if clear: 592 self.keep.clearRemoteData(remote.name) 593 if dump: 594 self.dumpRemote(remote=remote) 595 596 def renameRemote(self, remote, new, clear=True, dump=False): 597 ''' 598 Rename remote with old remote.name to new name but keep same index 599 ''' 600 old = remote.name 601 super(KeepStack, self).renameRemote(remote=remote, new=new) 602 if clear: 603 self.keep.clearRemoteData(old) 604 if dump: 605 self.dumpRemote(remote=remote) 606 607 def removeRemote(self, remote, clear=True): 608 ''' 609 Remove remote 610 If clear then also remove from disk 611 ''' 612 super(KeepStack, self).removeRemote(remote=remote) 613 if clear: 614 self.clearRemote(remote) 615 616 def removeAllRemotes(self, clear=True): 617 ''' 618 Remove all the remotes 619 If clear then also remove from disk 620 ''' 621 remotes = self.remotes.values() #make copy since changing .remotes in-place 622 for remote in remotes: 623 self.removeRemote(remote, clear=clear) 624 625 def clearAllDir(self): 626 ''' 627 Clear out and remove the keep dir and contents 628 ''' 629 console.verbose("Stack {0}: Clearing keep dir '{1}'\n".format( 630 self.name, self.keep.dirpath)) 631 self.keep.clearAllDir() 632 633 def dumpLocal(self): 634 ''' 635 Dump keeps of local 636 ''' 637 self.keep.dumpLocal(self.local) 638 639 def restoreLocal(self): 640 ''' 641 Load self.local from keep file if any and return local 642 Otherwise return None 643 ''' 644 local = None 645 data = self.keep.loadLocalData() 646 if data: 647 if self.keep.verifyLocalData(data): 648 local = lotting.Lot(stack=self, 649 uid=data['uid'], 650 name=data['name'], 651 ha=data['ha'], 652 sid = data['sid']) 653 self.local = local 654 else: 655 self.keep.clearLocalData() 656 return local 657 658 def clearLocalKeep(self): 659 ''' 660 Clear local keep 661 ''' 662 self.keep.clearLocalData() 663 664 def dumpRemote(self, remote): 665 ''' 666 Dump keeps of remote 667 ''' 668 self.keep.dumpRemote(remote) 669 670 def dumpRemotes(self, clear=True): 671 ''' 672 Dump all remotes data to keep files 673 If clear then clear all files first 674 ''' 675 if clear: 676 self.clearRemotes() 677 for remote in self.remotes.values(): 678 self.dumpRemote(remote) 679 680 def restoreRemote(self, name): 681 ''' 682 Load, add, and return remote with name if any 683 Otherwise return None 684 ''' 685 remote = None 686 data = self.keep.loadRemoteData(name) 687 if data: 688 if self.keep.verifyRemoteData(data): 689 remote = lotting.Lot(stack=self, 690 uid=data['uid'], 691 name=data['name'], 692 ha=data['ha'], 693 sid=data['sid']) 694 self.addRemote(remote) 695 else: 696 self.keep.clearRemoteData(name) 697 return remote 698 699 def restoreRemotes(self): 700 ''' 701 Load and add remote for each remote file 702 ''' 703 keeps = self.keep.loadAllRemoteData() 704 if keeps: 705 for name, data in keeps.items(): 706 if self.keep.verifyRemoteData(data): 707 remote = lotting.Lot(stack=self, 708 uid=data['uid'], 709 name=data['name'], 710 ha=data['ha'], 711 sid=data['sid']) 712 self.addRemote(remote) 713 else: 714 self.keep.clearRemoteData(name) 715 716 def clearRemote(self, remote): 717 ''' 718 Clear remote keep of remote 719 ''' 720 self.keep.clearRemoteData(remote.name) 721 722 def clearRemotes(self): 723 ''' 724 Clear remote keeps of .remotes 725 ''' 726 for remote in self.remotes.values(): 727 self.clearRemote(remote) 728 729 def clearRemoteKeeps(self): 730 ''' 731 Clear all remote keeps 732 ''' 733 self.keep.clearAllRemoteData() 734 735 def clearAllKeeps(self): 736 self.clearLocalKeep() 737 self.clearRemoteKeeps() 738 739