1# -*- coding: utf-8 -*- 2''' 3stacking.py raet protocol stacking classes 4''' 5# pylint: skip-file 6# pylint: disable=W0611 7 8# Import python libs 9import socket 10import os 11import errno 12 13from collections import deque, Mapping 14try: 15 import simplejson as json 16except ImportError: 17 import json 18 19try: 20 import msgpack 21except ImportError: 22 mspack = None 23 24# Import ioflo libs 25from ioflo.aid.odicting import odict 26from ioflo.base import nonblocking 27 28# Import raet libs 29from ..abiding import * # import globals 30from .. import raeting 31from ..raeting import PcktKind, TrnsKind, CoatKind, FootKind, BodyKind, HeadKind 32from .. import nacling 33from .. import stacking 34from . import keeping 35from . import packeting 36from . import estating 37from . import transacting 38 39from ioflo.base.consoling import getConsole 40console = getConsole() 41 42class RoadStack(stacking.KeepStack): 43 ''' 44 RAET protocol RoadStack for UDP communications. This is the primary 45 network communication system in RAET. A stack does not work in the 46 same way as a socket, instead transmit (tx) and recive (rx) lists become 47 populated or emptied when calls are made to the transmit and recieve 48 methods. 49 50 name 51 The name to give the stack and local estate, if no name is given it will be 52 automatically assigned 53 main 54 Flag indicating if the local estate is a main estate on the road 55 mutable 56 Flag indicating if credentials on road can be changed after initial join 57 veritive 58 Flag indicating if all received message transactions must be verified 59 keep 60 Pass in a keep object, this object can define how stack data 61 including keys is persisted to disk 62 dirpath 63 The location on the filesystem to use for stack caching 64 uid 65 The local estate id, if None is specified a default will be assigned 66 ha 67 The local estate host address, this is a tuple of (network_addr, port) that will 68 be bound to by the stack for accepting incoming packets 69 bufcnt 70 The number of messages to buffer, defaults to 2 71 auto 72 auto acceptance mode indicating how keys should be accepted 73 one of never, once, always 74 period 75 The default iteration timeframe to use for the background management 76 of the presence system. Defaults to 1.0 77 offset 78 The default offset to the start of period 79 interim 80 The default timeout to reap a dead remote 81 role 82 The local estate role identifier for key management 83 ''' 84 Count = 0 # count of Stack instances to give unique stack names 85 Hk = HeadKind.raet.value # stack default 86 Bk = BodyKind.json.value # stack default 87 Fk = FootKind.nacl.value # stack default 88 Ck = CoatKind.nacl.value # stack default 89 Bf = False # stack default for bcstflag 90 BurstSize = 0 # stack default for max segments in each burst, 0 = no limit 91 Period = 1.0 # stack default for keep alive 92 Offset = 0.5 # stack default for keep alive 93 Interim = 3600 # stack default for reap timeout 94 JoinerTimeout = 5.0 # stack default for joiner transaction timeout 95 JoinentTimeout = 5.0 # stack default for joinent transaction timeout 96 MsgStaleTimeout = 600.0 # stale messages waiting timeout 97 98 def __init__(self, 99 puid=None, 100 keep=None, 101 dirpath='', 102 basedirpath='', 103 auto=None, 104 local=None, #passed up from subclass 105 name='', 106 uid=None, #local estate uid, none means generate it 107 ha=None, 108 eha=None, 109 iha=None, 110 role=None, 111 sigkey=None, 112 prikey=None, 113 bufcnt=2, 114 kind=None, 115 mutable=None, 116 period=None, 117 offset=None, 118 interim=None, 119 veritive=True, 120 **kwa 121 ): 122 ''' 123 Setup instance 124 125 ''' 126 if getattr(self, 'puid', None) is None: 127 self.puid = puid if puid is not None else self.Uid 128 129 keep = keep or keeping.RoadKeep(dirpath=dirpath, 130 basedirpath=basedirpath, 131 stackname=name, 132 auto=auto) 133 134 ha = ha if ha is not None else ("", raeting.RAET_PORT) 135 136 local = local or estating.LocalEstate(stack=self, 137 name=name, 138 uid=uid, 139 ha=eha or ha, 140 iha=iha, 141 role=role, 142 sigkey=sigkey, 143 prikey=prikey, ) 144 local.stack = self 145 146 self.aha = ha # init before server is initialized 147 148 # Remotes reference these in their init so create before super 149 self.period = period if period is not None else self.Period 150 self.offset = offset if offset is not None else self.Offset 151 self.interim = interim if interim is not None else self.Interim 152 153 super(RoadStack, self).__init__(puid=puid, 154 keep=keep, 155 dirpath=dirpath, 156 basedirpath=basedirpath, 157 local=local, 158 bufcnt=bufcnt, 159 **kwa) 160 self.kind = kind # application kind associated with the local estate 161 self.mutable = mutable # road data mutability 162 self.veritive = True if veritive else False # rx message trans must be verified 163 self.joinees = odict() # remotes for vacuous joins, keyed by ha 164 self.alloweds = odict() # allowed remotes keyed by name 165 self.aliveds = odict() # alived remotes keyed by name 166 self.reapeds = odict() # reaped remotes keyed by name 167 self.availables = set() # set of available remote names 168 169 @property 170 def ha(self): 171 ''' 172 property that returns host address for accepting packets (listening ha) 173 ''' 174 return self.aha 175 176 @ha.setter 177 def ha(self, value): 178 self.aha = value 179 180 @property 181 def transactions(self): 182 ''' 183 property that returns list of transactions in all remotes 184 ''' 185 transactions = [] 186 for remote in self.remotes.values(): 187 transactions.extend(remote.transactions.values()) 188 return transactions 189 190 def serverFromLocal(self): 191 ''' 192 Create local listening server for stack 193 ''' 194 server = nonblocking.SocketUdpNb(ha=self.ha, 195 bufsize=raeting.UDP_MAX_PACKET_SIZE * self.bufcnt) 196 return server 197 198 def addRemote(self, remote, dump=False): 199 ''' 200 Add a remote to .remotes 201 ''' 202 super(RoadStack, self).addRemote(remote=remote, dump=dump) 203 if remote.timer.store is not self.store: 204 raise raeting.StackError("Store reference mismatch between remote" 205 " '{0}' and stack '{1}'".format(remote.name, stack.name)) 206 return remote 207 208 def removeRemote(self, remote, clear=True): 209 ''' 210 Remove remote at key uid. 211 If clear then also remove from disk 212 ''' 213 super(RoadStack, self).removeRemote(remote=remote, clear=clear) 214 for transaction in remote.transactions.values(): 215 transaction.nack() 216 217 def fetchRemoteByKeys(self, sighex, prihex): 218 ''' 219 Search for remote with matching (name, sighex, prihex) 220 Return remote if found Otherwise return None 221 ''' 222 for remote in self.remotes.values(): 223 if (remote.signer.keyhex == sighex or 224 remote.priver.keyhex == prihex): 225 return remote 226 227 return None 228 229 def retrieveRemote(self, uid=None): 230 ''' 231 Used when initiating a transaction 232 233 If uid is not None Then returns remote at duid if exists or None 234 If uid is None Then uses first remote unless no remotes then None 235 ''' 236 if uid is not None: 237 remote = self.remotes.get(uid, None) 238 else: 239 if self.remotes: 240 remote = self.remotes.values()[0] # zeroth is default 241 else: 242 remote = None 243 return remote 244 245 def createRemote(self, ha): 246 ''' 247 Use for vacuous join to create new remote 248 ''' 249 if not ha: 250 console.terse("Invalid host address = {0} when creating remote.".format(ha)) 251 self.incStat("failed_createremote") 252 return None 253 254 remote = self.newRemote(stack=self, 255 fuid=0, # vacuous join 256 sid=0, # always 0 for join 257 ha=ha) #if ha is not None else dha 258 259 try: 260 self.addRemote(remote) #provisionally add .accepted is None 261 except raeting.StackError as ex: 262 console.terse(str(ex) + '\n') 263 self.incStat("failed_addremote") 264 return None 265 266 return remote 267 268 def newRemote(self, **kwa): 269 ''' 270 Used as a wrapper to create new remotes 271 Override to add additional kwa validations 272 ''' 273 return estating.RemoteEstate(**kwa) 274 275 def dumpLocalRole(self): 276 ''' 277 Dump role keep of local 278 ''' 279 self.keep.dumpLocalRole(self.local) 280 281 def restoreLocal(self): 282 ''' 283 Load local estate if keeps found and verified and return 284 otherwise return None 285 ''' 286 local = None 287 keepData = self.keep.loadLocalData() 288 if keepData: 289 if self.keep.verifyLocalData(keepData): 290 ha = keepData['ha'] 291 iha = keepData['iha'] 292 aha = keepData['aha'] 293 local = estating.LocalEstate(stack=self, 294 uid=keepData['uid'], 295 name=keepData['name'], 296 ha=tuple(ha) if ha else ha, 297 iha=tuple(iha) if iha else iha, 298 natted=keepData['natted'], 299 fqdn=keepData['fqdn'], 300 dyned=keepData['dyned'], 301 sid=keepData['sid'], 302 role=keepData['role'], 303 sigkey=keepData['sighex'], 304 prikey=keepData['prihex'],) 305 self.puid = keepData['puid'] 306 self.aha = tuple(aha) if aha else aha 307 self.local = local 308 else: 309 self.keep.clearLocalData() 310 return local 311 312 def clearLocalRoleKeep(self): 313 ''' 314 Clear local keep 315 ''' 316 self.keep.clearLocalRoleData() 317 318 def dumpRemoteRole(self, remote): 319 ''' 320 Dump keeps of remote 321 ''' 322 self.keep.dumpRemoteRole(remote) 323 324 def restoreRemote(self, name): 325 ''' 326 Load, add, and return remote with name if any 327 Otherwise return None 328 ''' 329 remote = None 330 keepData = self.keep.loadRemoteData(name) 331 if keepData: 332 if self.keep.verifyRemoteData(keepData): 333 ha = keepData['ha'] 334 iha = keepData['iha'] 335 remote = self.newRemote(ha=tuple(ha) if ha else ha, 336 stack=self, 337 uid=keepData['uid'], 338 fuid=keepData['fuid'], 339 name=keepData['name'], 340 iha=tuple(iha) if iha else iha, 341 natted=keepData['natted'], 342 fqdn=keepData['fqdn'], 343 dyned=keepData['dyned'], 344 sid=keepData['sid'], 345 main=keepData['main'], 346 kind=keepData['kind'], 347 joined=keepData['joined'], 348 acceptance=keepData['acceptance'], 349 verkey=keepData['verhex'], 350 pubkey=keepData['pubhex'], 351 role=keepData['role']) 352 if remote: 353 self.addRemote(remote) 354 else: 355 self.keep.clearRemoteData(name) 356 return remote 357 358 def restoreRemotes(self): 359 ''' 360 Load .remotes from valid keep data if any 361 ''' 362 keeps = self.keep.loadAllRemoteData() 363 if keeps: 364 for name, keepData in keeps.items(): 365 if self.keep.verifyRemoteData(keepData): 366 ha = keepData['ha'] 367 iha = keepData['iha'] 368 remote = self.newRemote(ha=tuple(ha) if ha else ha, 369 stack=self, 370 uid=keepData['uid'], 371 fuid=keepData['fuid'], 372 name=keepData['name'], 373 iha=tuple(iha) if iha else iha, 374 natted=keepData['natted'], 375 fqdn=keepData['fqdn'], 376 dyned=keepData['dyned'], 377 sid=keepData['sid'], 378 main=keepData['main'], 379 kind=keepData['kind'], 380 joined=keepData['joined'], 381 acceptance=keepData['acceptance'], 382 verkey=keepData['verhex'], 383 pubkey=keepData['pubhex'], 384 role=keepData['role']) 385 if remote: 386 self.addRemote(remote) 387 else: 388 self.keep.clearRemoteData(name) 389 390 def clearRemoteRoleKeeps(self): 391 ''' 392 Clear all remote keeps 393 ''' 394 self.keep.clearAllRemoteRoleData() 395 396 def clearAllKeeps(self): 397 super(RoadStack, self).clearAllKeeps() 398 self.clearLocalRoleKeep() 399 self.clearRemoteRoleKeeps() 400 401 def manage(self, cascade=False, immediate=False): 402 ''' 403 Manage remote estates. Time based processing of remote status such as 404 presence (keep alive) etc. 405 406 cascade induces the alive transactions to run join, allow, alive until 407 failure or alive success 408 409 immediate indicates to run first attempt immediately and not wait for timer 410 411 availables = dict of remotes that are both alive and allowed 412 ''' 413 alloweds = odict() 414 aliveds = odict() 415 reapeds = odict() 416 for remote in self.remotes.values(): # should not start anything 417 remote.manage(cascade=cascade, immediate=immediate) 418 if remote.allowed: 419 alloweds[remote.name] = remote 420 if remote.alived: 421 aliveds[remote.name] = remote 422 if remote.reaped: 423 reapeds[remote.name] = remote 424 425 old = set(self.aliveds.keys()) 426 current = set(aliveds.keys()) 427 plus = current.difference(old) 428 minus = old.difference(current) 429 self.availables = current 430 self.changeds = odict(plus=plus, minus=minus) 431 self.alloweds = alloweds 432 self.aliveds = aliveds 433 self.reapeds = reapeds 434 435 def _handleOneRx(self): 436 ''' 437 Handle on message from .rxes deque 438 Assumes that there is a message on the .rxes deque 439 ''' 440 raw, sa = self.rxes.popleft() 441 console.verbose("{0} received packet\n{1}\n".format(self.name, raw)) 442 443 packet = packeting.RxPacket(stack=self, packed=raw) 444 try: 445 packet.parseOuter() 446 except raeting.PacketError as ex: 447 console.terse(str(ex) + '\n') 448 self.incStat('parsing_outer_error') 449 return 450 451 sh, sp = sa 452 packet.data.update(sh=sh, sp=sp) 453 self.processRx(packet) 454 455 def processRx(self, packet): 456 ''' 457 Process packet via associated transaction or 458 reply with new correspondent transaction 459 ''' 460 console.profuse("{0} received packet data\n{1}\n".format(self.name, packet.data)) 461 console.verbose("{0} received packet index: (rf={1[0]}, le={1[1]}, re={1[2]}," 462 " si={1[3]}, ti={1[4]}, bf={1[5]})\n".format(self.name, packet.index)) 463 try: 464 tkname = TrnsKind(packet.data['tk']) 465 except ValueError as ex: 466 tkname = None 467 try: 468 pkname = PcktKind(packet.data['pk']) 469 except ValueError as ex: 470 pkname = None 471 console.verbose("{0} received trans kind = '{1}' packet kind = '{2}'" 472 "\n".format(self.name, tkname, pkname)) 473 474 bf = packet.data['bf'] 475 if bf: 476 return # broadcast transaction not yet supported 477 478 de = packet.data['de'] # remote nuid 479 se = packet.data['se'] # remote fuid 480 tk = packet.data['tk'] 481 pk = packet.data['pk'] 482 cf = packet.data['cf'] 483 rsid = packet.data['si'] 484 485 remote = None 486 487 if tk in [TrnsKind.join]: # join transaction 488 sha = (packet.data['sh'], packet.data['sp']) 489 if rsid != 0: # join must use sid == 0 490 emsg = ("Stack '{0}'. Nonzero join sid '{1}' in packet from {2}." 491 " Dropping...\n".format(self.name, rsid, sha)) 492 console.terse(emsg) 493 self.incStat('join_invalid_sid') 494 return 495 496 if cf: # cf = not rf, packet source is joinent, destination (self) is joiner 497 if de == 0: # invalid since joiner rxed packet de (nuid) == 0 498 emsg = ("Stack '{0}'. Invalid join correspondence from '{1}'," 499 " nuid zero . Dropping...\n".format(self.name, sha)) 500 console.terse(emsg) 501 self.incStat('join_invalid_nuid') 502 return 503 if se == 0: # vacuous join since se (fuid) == 0 504 remote = self.joinees.get(sha, None) # match remote by rha from .joinees 505 if remote and remote.nuid != de: # prior different 506 emsg = ("Stack '{0}'. Invalid join correspondence from '{1}'," 507 "nuid {2} mismatch prior {3} . Dropping...\n".format( 508 self.name, sha, de, remote.nuid)) 509 self.incStat('join_mismatch_nuid') # drop inconsistent nuid 510 return 511 else: # non vacuous join match by nuid from .remotes 512 remote = self.remotes.get(de, None) 513 514 else: # (rf = not cf) # source is joiner, destination (self) is joinent 515 if se == 0: # invalid join 516 emsg = ("Stack '{0}'. Invalid join initiatance from '{1}'," 517 "fuid zero. Dropping...\n".format(self.name, sha)) 518 console.terse(emsg) 519 self.incStat('join_invalid_fuid') 520 return 521 if de == 0: # vacuous join match remote by rha from joinees 522 remote = self.joinees.get(sha, None) 523 if remote and remote.fuid != se: # check if prior is stale 524 if remote.fuid != 0: # stale 525 emsg = ("Stack '{0}'. Prior stale join initiatance from '{1}'," 526 " fuid {2} mismatch prior {3}. Removing prior...\n".format( 527 self.name, sha, se, remote.fuid)) 528 console.terse(emsg) 529 del self.joinees[sha] # remove prior stale vacuous joinee 530 remote = None # reset 531 532 if not remote: # no current joinees for remote initiator at rha 533 # is it not first packet of join 534 if pk not in [PcktKind.request]: 535 emsg = ("Stack '{0}'. Stale join initiatance from '{1}'," 536 " Not a request and no remote. Dropping...\n".format( 537 self.name, sha)) 538 console.terse(emsg) 539 self.incStat('join_stale') 540 return 541 542 # create vacuous remote will be assigned to joinees in joinent 543 remote = self.newRemote(stack=self, 544 fuid=0, # was fuid=se 545 sid=rsid, 546 ha=sha) 547 548 else: # nonvacuous join match by nuid from .remotes 549 remote = self.remotes.get(de, None) 550 if not remote: # remote with nuid not exist 551 # reject renew , so tell it to retry with vacuous join 552 emsg = ("Stack '{0}'. Stale nuid '{1}' in packet from {2}." 553 " Renewing....\n".format( self.name, de, sha)) 554 console.terse(emsg) 555 self.incStat('stale_nuid') 556 remote = self.newRemote(stack=self, 557 fuid=se, 558 sid=rsid, 559 ha=sha) 560 if not remote: 561 return 562 563 self.replyStale(packet, remote, renew=True) # nack stale transaction 564 return 565 566 else: # not join transaction 567 if rsid == 0: # cannot use sid == 0 on nonjoin transaction 568 emsg = ("Stack '{0}'. Invalid Zero sid '{1}' for transaction {2} packet" 569 " {3}. Dropping...\n".format(self.name, rsid, tk, pk)) 570 console.terse(emsg) 571 self.incStat('invalid_sid') 572 return 573 574 if de == 0 or se == 0: 575 emsg = ("Stack '{0}'. Invalid nonjoin from remote '{1}'." 576 " Zero nuid {2} or fuid {3}. Dropping...\n".format( 577 self.name, sha, de, se)) 578 console.terse(emsg) 579 self.incStat('invalid_uid') 580 return 581 582 remote = self.remotes.get(de, None) 583 584 if remote: 585 if not cf: # packet from remotely initiated transaction 586 if not remote.validRsid(rsid): # invalid rsid 587 emsg = ("Stack '{0}'. Invalid nonjoin from '{1}'. Invalid sid " 588 " {2} in packet given prior sid {3}. " 589 "Dropping...\n".format(self.name, 590 remote.name, 591 rsid, 592 remote.rsid)) 593 console.terse(emsg) 594 self.incStat('stale_sid') 595 self.replyStale(packet, remote) # nack stale transaction 596 return 597 598 if rsid != remote.rsid: # updated valid rsid so change remote.rsid 599 remote.rsid = rsid 600 remote.removeStaleCorrespondents() 601 602 if remote.reaped: 603 remote.unreap() # packet a valid packet so remote is not dead 604 605 if remote: 606 trans = remote.transactions.get(packet.index, None) 607 if trans: 608 trans.receive(packet) 609 return 610 611 if cf: # cf = not rf packet from correspondent to non-existent locally initiated transaction 612 self.stale(packet) 613 return 614 615 if not remote: 616 emsg = ("Stack '{0}'. Unknown remote destination '{1}'. " 617 "Dropping...\n".format(self.name, de)) 618 console.terse(emsg) 619 self.incStat('unknown_destination_uid') 620 return 621 622 self.correspond(packet, remote) # correspond to new transaction initiated by remote 623 624 def correspond(self, packet, remote): 625 ''' 626 Create correspondent transaction remote and handle packet 627 ''' 628 if (packet.data['tk'] == TrnsKind.join and 629 packet.data['pk'] == PcktKind.request): 630 self.replyJoin(packet, remote) 631 return 632 633 if (packet.data['tk'] == TrnsKind.allow and 634 packet.data['pk'] == PcktKind.hello): 635 self.replyAllow(packet, remote) 636 return 637 638 if (packet.data['tk'] == TrnsKind.alive and 639 packet.data['pk'] == PcktKind.request): 640 self.replyAlive(packet, remote) 641 return 642 643 if (packet.data['tk'] == TrnsKind.message and 644 packet.data['pk'] == PcktKind.message): 645 # transaction with this ID already handled and removed packet is a stale resend 646 if packet.data['af'] or packet.data['ti'] in remote.doneTransactions: 647 self.replyStale(packet, remote) 648 else: 649 self.replyMessage(packet, remote) 650 return 651 652 self.incStat('stale_packet') 653 654 def process(self): 655 ''' 656 Call .process or all remotes to allow timer based processing 657 of their transactions 658 ''' 659 #for transaction in self.transactions.values(): 660 #transaction.process() 661 for remote in self.remotes.values(): 662 remote.process() 663 664 def parseInner(self, packet): 665 ''' 666 Parse inner of packet and return 667 Assume all drop checks done 668 ''' 669 try: 670 packet.parseInner() 671 console.verbose("Stack '{0}'. Received packet body\n{1}\n".format( 672 self.name, packet.body.data)) 673 except raeting.PacketError as ex: 674 console.terse(str(ex) + '\n') 675 self.incStat('parsing_inner_error') 676 return None 677 return packet 678 679 def stale(self, packet): 680 ''' 681 Initiate stale transaction in order to nack a stale correspondent packet 682 but only for preexisting remotes 683 ''' 684 if packet.data['pk'] in [PcktKind.nack, 685 PcktKind.unjoined, 686 PcktKind.unallowed, 687 PcktKind.renew, 688 PcktKind.refuse, 689 PcktKind.reject,]: 690 return # ignore stale nacks 691 create = False 692 uid = packet.data['de'] 693 remote = self.retrieveRemote(uid=uid) 694 if not remote: 695 emsg = "Stack '{0}'. Unknown remote id '{1}', fail to nack stale\n".format( 696 self.name, uid) 697 console.terse(emsg) 698 self.incStat('invalid_remote_eid') 699 return 700 data = odict(hk=self.Hk, bk=self.Bk) 701 staler = transacting.Staler(stack=self, 702 remote=remote, 703 kind=packet.data['tk'], 704 sid=packet.data['si'], 705 tid=packet.data['ti'], 706 txData=data, 707 rxPacket=packet) 708 staler.nack() 709 710 def replyStale(self, packet, remote, renew=False): 711 ''' 712 Correspond to stale initiated transaction 713 ''' 714 if packet.data['pk'] in [PcktKind.nack, 715 PcktKind.unjoined, 716 PcktKind.unallowed, 717 PcktKind.refuse, 718 PcktKind.reject,]: 719 return # ignore stale nacks 720 data = odict(hk=self.Hk, bk=self.Bk) 721 stalent = transacting.Stalent(stack=self, 722 remote=remote, 723 kind=packet.data['tk'], 724 sid=packet.data['si'], 725 tid=packet.data['ti'], 726 txData=data, 727 rxPacket=packet) 728 if renew: 729 stalent.nack(kind=PcktKind.renew.value) # refuse and renew 730 else: 731 stalent.nack() 732 733 def join(self, uid=None, timeout=None, cascade=False, renewal=False): 734 ''' 735 Initiate join transaction 736 ''' 737 remote = self.retrieveRemote(uid=uid) 738 if not remote: 739 emsg = "Invalid remote destination estate id '{0}'\n".format(uid) 740 console.terse(emsg) 741 self.incStat('invalid_remote_eid') 742 return 743 744 timeout = timeout if timeout is not None else self.JoinerTimeout 745 data = odict(hk=self.Hk, bk=self.Bk) 746 joiner = transacting.Joiner(stack=self, 747 remote=remote, 748 timeout=timeout, 749 txData=data, 750 cascade=cascade, 751 renewal=renewal) 752 joiner.join() 753 754 def replyJoin(self, packet, remote, timeout=None): 755 ''' 756 Correspond to new join transaction 757 ''' 758 timeout = timeout if timeout is not None else self.JoinentTimeout 759 data = odict(hk=self.Hk, bk=self.Bk) 760 joinent = transacting.Joinent(stack=self, 761 remote=remote, 762 timeout=timeout, 763 sid=packet.data['si'], 764 tid=packet.data['ti'], 765 txData=data, 766 rxPacket=packet) 767 joinent.join() 768 769 def allow(self, uid=None, timeout=None, cascade=False): 770 ''' 771 Initiate allow transaction 772 ''' 773 remote = self.retrieveRemote(uid=uid) 774 if not remote: 775 emsg = "Invalid remote destination estate id '{0}'\n".format(uid) 776 console.terse(emsg) 777 self.incStat('invalid_remote_eid') 778 return 779 data = odict(hk=self.Hk, bk=BodyKind.raw.value, fk=self.Fk) 780 allower = transacting.Allower(stack=self, 781 remote=remote, 782 timeout=timeout, 783 txData=data, 784 cascade=cascade) 785 allower.hello() 786 787 def replyAllow(self, packet, remote): 788 ''' 789 Correspond to new allow transaction 790 ''' 791 data = odict(hk=self.Hk, bk=BodyKind.raw.value, fk=self.Fk) 792 allowent = transacting.Allowent(stack=self, 793 remote=remote, 794 sid=packet.data['si'], 795 tid=packet.data['ti'], 796 txData=data, 797 rxPacket=packet) 798 allowent.hello() 799 800 def alive(self, uid=None, timeout=None, cascade=False): 801 ''' 802 Initiate alive transaction 803 If duid is None then create remote at ha 804 ''' 805 remote = self.retrieveRemote(uid=uid) 806 if not remote: 807 emsg = "Invalid remote destination estate id '{0}'\n".format(uid) 808 console.terse(emsg) 809 self.incStat('invalid_remote_eid') 810 return 811 data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck) 812 aliver = transacting.Aliver(stack=self, 813 remote=remote, 814 timeout=timeout, 815 txData=data, 816 cascade=cascade) 817 aliver.alive() 818 819 def replyAlive(self, packet, remote): 820 ''' 821 Correspond to new Alive transaction 822 ''' 823 data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck) 824 alivent = transacting.Alivent(stack=self, 825 remote=remote, 826 bcst=packet.data['bf'], 827 sid=packet.data['si'], 828 tid=packet.data['ti'], 829 txData=data, 830 rxPacket=packet) 831 alivent.alive() 832 833 def transmit(self, msg, uid=None, timeout=None): 834 ''' 835 Append duple (msg, uid) to .txMsgs deque 836 If msg is not mapping then raises exception 837 If uid is None then it will default to the first entry in .remotes 838 If timeout is None then it will use Messenger default 839 timeout of 0 means never timeout of message transaction 840 ''' 841 if not isinstance(msg, Mapping): 842 emsg = "Invalid msg, not a mapping {0}\n".format(msg) 843 console.terse(emsg) 844 self.incStat("invalid_transmit_body") 845 return 846 if uid is None: 847 if not self.remotes: 848 emsg = "No remote to send to\n" 849 console.terse(emsg) 850 self.incStat("invalid_destination") 851 return 852 uid = self.remotes.values()[0].uid 853 self.txMsgs.append((msg, uid, timeout)) 854 855 def _handleOneTxMsg(self): 856 ''' 857 Take one message from .txMsgs deque and handle it 858 Assumes there is a message on the deque 859 ''' 860 # triple (body dict, destination uid, timout) 861 body, uid, timeout = self.txMsgs.popleft() 862 self.message(body, uid=uid, timeout=timeout) 863 console.verbose("{0} sending\n{1}\n".format(self.name, body)) 864 865 def message(self, body, uid=None, timeout=None): 866 ''' 867 Initiate message transaction to remote at duid 868 If uid is None then create remote at ha 869 If timeout is None then use Messenger default 870 If timeout is 0 then never timeout 871 ''' 872 remote = self.retrieveRemote(uid=uid) 873 if not remote: 874 emsg = "Invalid remote destination estate id '{0}'\n".format(uid) 875 console.terse(emsg) 876 self.incStat('invalid_remote_uid') 877 return 878 data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck) 879 messenger = transacting.Messenger(stack=self, 880 remote=remote, 881 timeout=timeout, 882 txData=data, 883 bcst=self.Bf, 884 burst=self.BurstSize) 885 messenger.message(body) 886 887 def replyMessage(self, packet, remote): 888 ''' 889 Correspond to new Message transaction 890 ''' 891 data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck) 892 messengent = transacting.Messengent(stack=self, 893 remote=remote, 894 bcst=packet.data['bf'], 895 sid=packet.data['si'], 896 tid=packet.data['ti'], 897 txData=data, 898 rxPacket=packet) 899 messengent.message() 900 901