1# -*- coding: utf-8 -*- 2''' 3stacking.py raet protocol stacking classes 4''' 5# pylint: skip-file 6# pylint: disable=W0611 7 8# Import python libs 9import socket 10import binascii 11import struct 12 13try: 14 import simplejson as json 15except ImportError: 16 import json 17 18# Import ioflo libs 19from ioflo.aid.odicting import odict 20from ioflo.aid.osetting import oset 21from ioflo.aid.timing import StoreTimer 22from ioflo.aid.aiding import packByte, unpackByte 23 24# Import raet libs 25from ..abiding import * # import globals 26from .. import raeting 27from ..raeting import Acceptance, PcktKind, TrnsKind, CoatKind, FootKind 28from .. import nacling 29from . import packeting 30from . import estating 31 32from ioflo.base.consoling import getConsole 33console = getConsole() 34 35class Transaction(object): 36 ''' 37 RAET protocol transaction class 38 ''' 39 Timeout = 5.0 # default timeout 40 41 def __init__(self, stack=None, remote=None, kind=None, timeout=None, 42 rmt=False, bcst=False, sid=None, tid=None, 43 txData=None, txPacket=None, rxPacket=None): 44 ''' 45 Setup Transaction instance 46 timeout of 0.0 means no timeout go forever 47 ''' 48 self.stack = stack 49 self.remote = remote 50 self.kind = kind or raeting.PACKET_DEFAULTS['tk'] 51 52 if timeout is None: 53 timeout = self.Timeout 54 self.timeout = timeout 55 self.timer = StoreTimer(self.stack.store, duration=self.timeout) 56 57 self.rmt = rmt # remote initiator 58 self.bcst = bcst # bf flag 59 60 self.sid = sid 61 self.tid = tid 62 63 self.txData = txData or odict() # data used to prepare last txPacket 64 self.txPacket = txPacket # last tx packet needed for retries 65 self.rxPacket = rxPacket # last rx packet needed for index 66 67 @property 68 def index(self): 69 ''' 70 Property is transaction tuple (rf, le, re, si, ti, bf,) 71 Not to be used in join (Joiner and Joinent) since bootstrapping 72 Use the txPacket (Joiner) or rxPacket (Joinent) .data instead 73 ''' 74 le = self.remote.nuid 75 re = self.remote.fuid 76 return ((self.rmt, le, re, self.sid, self.tid, self.bcst,)) 77 78 def process(self): 79 ''' 80 Process time based handling of transaction like timeout or retries 81 ''' 82 pass 83 84 def receive(self, packet): 85 ''' 86 Process received packet Subclasses should super call this 87 ''' 88 self.rxPacket = packet 89 90 def transmit(self, packet): 91 ''' 92 Queue tx duple on stack transmit queue 93 ''' 94 try: 95 self.stack.tx(packet.packed, self.remote.uid) 96 except raeting.StackError as ex: 97 console.terse(str(ex) + '\n') 98 self.stack.incStat(self.statKey()) 99 self.remove(remote=self.remote, index=packet.index) 100 return 101 self.txPacket = packet 102 103 def add(self, remote=None, index=None): 104 ''' 105 Add self to remote transactions 106 ''' 107 if not index: 108 index = self.index 109 if not remote: 110 remote = self.remote 111 remote.addTransaction(index, self) 112 113 def remove(self, remote=None, index=None): 114 ''' 115 Remove self from remote transactions 116 ''' 117 if not index: 118 index = self.index 119 if not remote: 120 remote = self.remote 121 if remote: 122 remote.removeTransaction(index, transaction=self) 123 124 def statKey(self): 125 ''' 126 Return the stat name key from class name 127 ''' 128 return ("{0}_transaction_failure".format(self.__class__.__name__.lower())) 129 130 def nack(self, **kwa): 131 ''' 132 Placeholder override in sub class 133 nack to terminate transaction with other side of transaction 134 ''' 135 pass 136 137class Initiator(Transaction): 138 ''' 139 RAET protocol initiator transaction class 140 ''' 141 def __init__(self, **kwa): 142 ''' 143 Setup Transaction instance 144 ''' 145 kwa['rmt'] = False # force rmt to False since local initator 146 super(Initiator, self).__init__(**kwa) 147 148 def process(self): 149 ''' 150 Process time based handling of transaction like timeout or retries 151 ''' 152 if self.timeout > 0.0 and self.timer.expired: 153 self.remove() 154 155class Correspondent(Transaction): 156 ''' 157 RAET protocol correspondent transaction class 158 ''' 159 Requireds = ['sid', 'tid', 'rxPacket'] 160 161 def __init__(self, **kwa): 162 ''' 163 Setup Transaction instance 164 ''' 165 kwa['rmt'] = True # force rmt to True since remote initiator 166 167 missing = [] 168 for arg in self.Requireds: 169 if arg not in kwa: 170 missing.append(arg) 171 if missing: 172 emsg = "Missing required keyword arguments: '{0}'".format(missing) 173 raise TypeError(emsg) 174 175 super(Correspondent, self).__init__(**kwa) 176 177class Staler(Initiator): 178 ''' 179 RAET protocol Staler initiator transaction class 180 ''' 181 def __init__(self, **kwa): 182 ''' 183 Setup Transaction instance 184 ''' 185 for key in ['kind', 'sid', 'tid', 'rxPacket']: 186 if key not in kwa: 187 emsg = "Missing required keyword arguments: '{0}'".format(key) 188 raise TypeError(emsg) 189 super(Staler, self).__init__(**kwa) 190 191 self.prep() 192 193 def prep(self): 194 ''' 195 Prepare .txData for nack to stale 196 ''' 197 self.txData.update( 198 dh=self.rxPacket.data['sh'], # may need for index 199 dp=self.rxPacket.data['sp'], # may need for index 200 se=self.remote.nuid, 201 de=self.rxPacket.data['se'], 202 tk=self.kind, 203 cf=self.rmt, 204 bf=self.bcst, 205 si=self.sid, 206 ti=self.tid, 207 ck=self.rxPacket.data['ck'], # CoatKind.nada.value, 208 fk=self.rxPacket.data['fk'], # FootKind.nada.value 209 ) 210 211 def nack(self): 212 ''' 213 Send nack to stale packet from correspondent. 214 This is used when a correspondent packet is received but no matching 215 Initiator transaction is found. So create a dummy initiator and send 216 a nack packet back. Do not add transaction so don't need to remove it. 217 ''' 218 ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp']) 219 try: 220 tkname = TrnsKind(self.rxPacket.data['tk']) 221 except ValueError as ex: 222 tkname = None 223 try: 224 pkname = TrnsKind(self.rxPacket.data['pk']) 225 except ValueError as ex: 226 pkname = None 227 228 emsg = ("Staler '{0}'. Stale transaction '{1}' packet '{2}' from '{3}' in {4} " 229 "nacking...\n".format(self.stack.name, tkname, pkname, ha, self.tid)) 230 console.terse(emsg) 231 self.stack.incStat('stale_correspondent_attempt') 232 233 if self.rxPacket.data['se'] not in self.stack.remotes: 234 emsg = "Staler '{0}'. Unknown correspondent estate id '{1}'\n".format( 235 self.stack.name, self.rxPacket.data['se']) 236 console.terse(emsg) 237 self.stack.incStat('unknown_correspondent_uid') 238 #return #maybe we should return and not respond at all in this case 239 240 body = odict() 241 packet = packeting.TxPacket(stack=self.stack, 242 kind=PcktKind.nack.value, 243 embody=body, 244 data=self.txData) 245 try: 246 packet.pack() 247 except raeting.PacketError as ex: 248 console.terse(str(ex) + '\n') 249 self.stack.incStat("packing_error") 250 return 251 252 self.stack.txes.append((packet.packed, ha)) 253 console.terse("Staler '{0}'. Do Nack of stale correspondent {1} in {2} at {3}\n".format( 254 self.stack.name, ha, self.tid, self.stack.store.stamp)) 255 self.stack.incStat('stale_correspondent_nack') 256 257 258class Stalent(Correspondent): 259 ''' 260 RAET protocol Stalent correspondent transaction class 261 ''' 262 Requireds = ['kind', 'sid', 'tid', 'rxPacket'] 263 264 def __init__(self, **kwa): 265 ''' 266 Setup Transaction instance 267 ''' 268 super(Stalent, self).__init__(**kwa) 269 270 self.prep() 271 272 def prep(self): 273 ''' 274 Prepare .txData for nack to stale 275 ''' 276 self.txData.update( 277 dh=self.rxPacket.data['sh'], # may need for index 278 dp=self.rxPacket.data['sp'], # may need for index 279 se=self.rxPacket.data['de'], 280 de=self.rxPacket.data['se'], 281 tk=self.kind, 282 cf=self.rmt, 283 bf=self.bcst, 284 si=self.sid, 285 ti=self.tid, 286 ck=self.rxPacket.data['ck'], # CoatKind.nada.value 287 fk=self.rxPacket.data['fk'], # FootKind.nada.value 288 ) 289 290 def nack(self, kind=PcktKind.nack.value): 291 ''' 292 Send nack to stale packet from initiator. 293 This is used when a initiator packet is received but with a stale session id 294 So create a dummy correspondent and send a nack packet back. 295 Do not add transaction so don't need to remove it. 296 ''' 297 ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp']) 298 try: 299 tkname = TrnsKind(self.rxPacket.data['tk']) 300 except ValueError as ex: 301 tkname = None 302 try: 303 pkname = TrnsKind(self.rxPacket.data['pk']) 304 except ValueError as ex: 305 pkname = None 306 307 emsg = ("Stalent '{0}'. Stale transaction '{1}' packet '{2}' from '{3}' in {4} " 308 "nacking ...\n".format(self.stack.name, tkname, pkname, ha, self.tid)) 309 console.terse(emsg) 310 self.stack.incStat('stale_initiator_attempt') 311 312 if self.rxPacket.data['se'] not in self.stack.remotes: 313 emsg = "Stalent '{0}'. Unknown initiator estate id '{1}'\n".format( 314 self.stack.name, 315 self.rxPacket.data['se']) 316 console.terse(emsg) 317 self.stack.incStat('unknown_initiator_uid') 318 #return #maybe we should return and not respond at all in this case 319 320 body = odict() 321 packet = packeting.TxPacket(stack=self.stack, 322 kind=kind, 323 embody=body, 324 data=self.txData) 325 try: 326 packet.pack() 327 except raeting.PacketError as ex: 328 console.terse(str(ex) + '\n') 329 self.stack.incStat("packing_error") 330 return 331 332 if kind == PcktKind.renew: 333 console.terse("Stalent '{0}'. Do Renew of {1} in {2} at {3}\n".format( 334 self.stack.name, ha, self.tid, self.stack.store.stamp)) 335 elif kind == PcktKind.refuse: 336 console.terse("Stalent '{0}'. Do Refuse of {1} in {2} at {3}\n".format( 337 self.stack.name, ha, self.tid, self.stack.store.stamp)) 338 elif kind == PcktKind.reject: 339 console.terse("Stalent '{0}'. Do Reject of {1} in {2} at {3}\n".format( 340 self.stack.name, ha, self.tid, self.stack.store.stamp)) 341 elif kind == PcktKind.nack: 342 console.terse("Stalent '{0}'. Do Nack of {1} in {2} at {3}\n".format( 343 self.stack.name, ha, self.tid, self.stack.store.stamp)) 344 else: 345 console.terse("Stalent '{0}'. Invalid nack kind {1}. Do Nack of {2} anyway " 346 " to {3) at {4}\n".format(self.stack.name, 347 kind, 348 ha, 349 self.tid, 350 self.stack.store.stamp)) 351 kind == PcktKind.nack 352 353 self.stack.txes.append((packet.packed, ha)) 354 self.stack.incStat('stale_initiator_nack') 355 356class Joiner(Initiator): 357 ''' 358 RAET protocol Joiner Initiator class Dual of Joinent 359 360 Joiner must always add new remote since always must anticipate response to 361 request. 362 ''' 363 RedoTimeoutMin = 1.0 # initial timeout 364 RedoTimeoutMax = 4.0 # max timeout 365 PendRedoTimeout = 60.0 # Redo timeout when pended 366 367 def __init__(self, 368 redoTimeoutMin=None, 369 redoTimeoutMax=None, 370 pendRedoTimeout=None, 371 cascade=False, 372 renewal=False, 373 **kwa): 374 ''' 375 Setup Transaction instance 376 ''' 377 kwa['kind'] = TrnsKind.join.value 378 super(Joiner, self).__init__(**kwa) 379 380 self.cascade = cascade 381 382 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 383 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 384 self.redoTimer = StoreTimer(self.stack.store, 385 duration=self.redoTimeoutMin) 386 self.pendRedoTimeout = pendRedoTimeout or self.PendRedoTimeout 387 388 self.sid = 0 #always 0 for join 389 self.tid = self.remote.nextTid() 390 # fuid is assigned during join but want to preserve vacuousness for remove 391 self.vacuous = (self.remote.fuid == 0) 392 self.renewal = renewal # is current join a renew, vacuous rejoin 393 self.pended = False # Farside Correspondent has pended remote acceptance 394 self.prep() 395 # don't dump remote yet since its ephemeral until we join and get valid uid 396 397 def transmit(self, packet): 398 ''' 399 Augment transmit with restart of redo timer 400 ''' 401 super(Joiner, self).transmit(packet) 402 self.redoTimer.restart() 403 404 def add(self, remote=None, index=None): 405 ''' 406 Augment with add self.remote to stack.joinees if vacuous 407 ''' 408 super(Joiner, self).add(remote=remote, index=index) 409 # self.remote is now assigned 410 if self.vacuous: # vacuous 411 self.stack.joinees[self.remote.ha] = self.remote 412 413 def remove(self, remote=None, index=None): 414 ''' 415 Remove self from stack transactions 416 ''' 417 super(Joiner, self).remove(remote=remote, index=index) 418 # self.remote is now assigned 419 if self.vacuous: # vacuous 420 if self.remote.ha in self.stack.joinees and not self.remote.transactions: 421 del self.stack.joinees[self.remote.ha] 422 423 def receive(self, packet): 424 """ 425 Process received packet belonging to this transaction 426 """ 427 super(Joiner, self).receive(packet) # self.rxPacket = packet 428 429 if packet.data['tk'] == TrnsKind.join: 430 if packet.data['pk'] == PcktKind.pend: # pending 431 self.stack.incStat('joiner_rx_pend') 432 self.pend() 433 elif packet.data['pk'] == PcktKind.response: # accepted 434 self.stack.incStat('joiner_rx_response') 435 self.accept() 436 elif packet.data['pk'] == PcktKind.nack: #stale 437 self.stack.incStat('joiner_rx_nack') 438 self.refuse() 439 elif packet.data['pk'] == PcktKind.refuse: #refused 440 self.stack.incStat('joiner_rx_refuse') 441 self.refuse() 442 elif packet.data['pk'] == PcktKind.renew: #renew 443 self.stack.incStat('joiner_rx_renew') 444 self.renew() 445 elif packet.data['pk'] == PcktKind.reject: #rejected 446 self.stack.incStat('joiner_rx_reject') 447 self.reject() 448 449 def process(self): 450 ''' 451 Perform time based processing of transaction 452 ''' 453 if self.timeout > 0.0 and self.timer.expired: 454 if self.txPacket and self.txPacket.data['pk'] == PcktKind.request: 455 self.remove(index=self.txPacket.index) 456 else: 457 self.remove(index=self.index) # in case never sent txPacket 458 459 console.concise("Joiner {0}. Timed out with {1} in {2} at {3}\n".format( 460 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 461 462 return 463 464 # need keep sending join until accepted or timed out 465 if self.redoTimer.expired: 466 if not self.pended: 467 duration = min( 468 max(self.redoTimeoutMin, 469 self.redoTimer.duration * 2.0), 470 self.redoTimeoutMax) 471 else: 472 duration = self.pendRedoTimeout 473 474 self.redoTimer.restart(duration=duration) 475 if (self.txPacket and 476 self.txPacket.data['pk'] == PcktKind.request): 477 self.transmit(self.txPacket) #redo 478 console.concise("Joiner {0}. Redo Join with {1} in {2} at {3}\n".format( 479 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 480 self.stack.incStat('joiner_tx_join_redo') 481 else: #check to see if status has changed to accept after other kind 482 if self.remote: 483 status = self.stack.keep.statusRemote(self.remote, dump=True) 484 if status == Acceptance.accepted: 485 self.completify() 486 elif status == Acceptance.rejected: 487 "Joiner {0}: Estate '{1}' uid '{2}' keys rejected\n".format( 488 self.stack.name, self.remote.name, self.remote.uid) 489 self.stack.removeRemote(self.remote, clear=True) 490 # removeRemote also nacks 491 492 def prep(self): 493 ''' 494 Prepare .txData 495 ''' 496 self.txData.update( 497 dh=self.remote.ha[0], # may need for index 498 dp=self.remote.ha[1], # may need for index 499 se=self.remote.nuid, 500 de=self.remote.fuid, 501 tk=self.kind, 502 cf=self.rmt, 503 bf=self.bcst, 504 si=self.sid, 505 ti=self.tid, 506 ck=CoatKind.nada.value, 507 fk=FootKind.nada.value 508 ) 509 510 def join(self): 511 ''' 512 Send join request 513 ''' 514 joins = self.remote.joinInProcess() 515 if joins: 516 emsg = ("Joiner {0}. Join with {1} already in process. " 517 "Aborting...\n".format( 518 self.stack.name, 519 self.remote.name)) 520 console.concise(emsg) 521 return 522 523 self.remote.joined = None 524 525 if self.stack.kind is None: 526 self.stack.kind = 0 527 else: 528 if self.stack.kind < 0 or self.stack.kind > 255: 529 emsg = ("Joiner {0}. Invalid application kind field value {1} for {2}. " 530 "Aborting...\n".format( 531 self.stack.name, 532 self.stack.kind, 533 self.remote.name)) 534 console.concise(emsg) 535 return 536 537 flags = [0, 0, 0, 0, 0, 0, 0, self.stack.main] # stack operation mode flags 538 operation = packByte(fmt=b'11111111', fields=flags) 539 body = odict([('name', self.stack.local.name), 540 ('mode', operation), 541 ('kind', self.stack.kind), 542 ('verhex', str(self.stack.local.signer.verhex.decode('ISO-8859-1')) 543 if self.stack.local.signer.verhex else None ), 544 ('pubhex', str(self.stack.local.priver.pubhex.decode('ISO-8859-1')) 545 if self.stack.local.priver.pubhex else None), 546 ('role', self.stack.local.role)]) 547 packet = packeting.TxPacket(stack=self.stack, 548 kind=PcktKind.request.value, 549 embody=body, 550 data=self.txData) 551 try: 552 packet.pack() 553 except raeting.PacketError as ex: 554 console.terse(str(ex) + '\n') 555 self.stack.incStat("packing_error") 556 self.remove() 557 return 558 console.concise("Joiner {0}. Do Join with {1} in {2} at {3}\n".format( 559 self.stack.name, 560 self.remote.name, 561 self.tid, 562 self.stack.store.stamp)) 563 self.transmit(packet) 564 self.add(index=self.txPacket.index) 565 566 def renew(self): 567 ''' 568 Perform renew in response to nack renew 569 Reset to vacuous Road data and try joining again if not main 570 Otherwise act as if rejected 571 ''' 572 # renew not allowed on immutable road 573 if not self.stack.mutable: 574 self.stack.incStat('join_renew_unallowed') 575 emsg = ("Joiner {0}. Renew from '{1}' not allowed on immutable" 576 " road\n".format(self.stack.name, self.remote.name)) 577 console.terse(emsg) 578 self.refuse() 579 return 580 581 console.terse("Joiner {0}. Renew from {1} in {2} at {3}\n".format( 582 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 583 self.stack.incStat('join_renew_attempt') 584 self.remove(index=self.txPacket.index) 585 if self.remote: 586 self.remote.fuid = 0 # forces vacuous join 587 self.stack.dumpRemote(self.remote) # since change fuid 588 self.stack.join(uid=self.remote.uid, timeout=self.timeout, renewal=True) 589 590 def pend(self): 591 ''' 592 Process ack pend to join packet 593 ''' 594 if not self.stack.parseInner(self.rxPacket): 595 return 596 self.pended = True 597 598 def accept(self): 599 ''' 600 Perform acceptance in response to join response packet 601 ''' 602 if not self.stack.parseInner(self.rxPacket): 603 return 604 605 data = self.rxPacket.data 606 body = self.rxPacket.body.data 607 608 name = body.get('name') 609 if not name: 610 emsg = "Missing remote name in accept packet\n" 611 console.terse(emsg) 612 self.stack.incStat('invalid_accept') 613 self.remove(index=self.txPacket.index) 614 return 615 616 mode = body.get('mode') 617 if mode is None or not isinstance(mode, int) or mode < 0 or mode > 255: 618 emsg = "Missing or invalid remote stack operation mode in accept packet\n" 619 console.terse(emsg) 620 self.stack.incStat('invalid_accept') 621 self.remove(index=self.txPacket.index) 622 return 623 flags = unpackByte(fmt=b'11111111', byte=mode, boolean=True) 624 main = flags[7] 625 626 kind = body.get('kind') 627 if kind is None: 628 emsg = "Missing or invalid remote application kind in accept packet\n" 629 console.terse(emsg) 630 self.stack.incStat('invalid_accept') 631 self.remove(index=self.txPacket.index) 632 return 633 634 fuid = body.get('uid') 635 if not fuid: # None or zero 636 emsg = "Missing or invalid remote farside uid in accept packet\n" 637 console.terse(emsg) 638 self.stack.incStat('invalid_accept') 639 self.remove(index=self.txPacket.index) 640 return 641 642 verhex = body.get('verhex', '') 643 if not verhex: 644 emsg = "Missing remote verifier key in accept packet\n" 645 console.terse(emsg) 646 self.stack.incStat('invalid_accept') 647 self.remove(index=self.txPacket.index) 648 return 649 650 pubhex = body.get('pubhex', '') 651 if not pubhex: 652 emsg = "Missing remote crypt key in accept packet\n" 653 console.terse(emsg) 654 self.stack.incStat('invalid_accept') 655 self.remove(index=self.txPacket.index) 656 return 657 658 role = body.get('role') 659 if not role: 660 emsg = "Missing remote role in accept packet\n" 661 console.terse(emsg) 662 self.stack.incStat('invalid_accept') 663 self.remove(index=self.txPacket.index) 664 return 665 666 rha = (data['sh'], data['sp']) 667 reid = data['se'] 668 leid = data['de'] 669 670 if self.vacuous: 671 self.remote.fuid = fuid 672 if not self.renewal: # ephemeral like 673 if name != self.remote.name: 674 if name in self.stack.nameRemotes: 675 emsg = ("Joiner {0}. New name '{1}' unavailable for " 676 "remote {2}\n".format(self.stack.name, 677 name, 678 self.remote.name)) 679 console.terse(emsg) 680 self.nack(kind=PcktKind.reject.value) 681 return 682 try: 683 self.stack.renameRemote(self.remote, new=name) 684 except raeting.StackError as ex: 685 console.terse(str(ex) + '\n') 686 self.stack.incStat(self.statKey()) 687 self.remove(index=self.txPacket.index) 688 return 689 self.remote.main = main 690 self.remote.kind = kind 691 self.remote.fuid = fuid 692 self.remote.role = role 693 self.remote.verfer = nacling.Verifier(verhex) # verify key manager 694 self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager 695 696 sameRoleKeys = (role == self.remote.role and 697 ns2b(verhex) == self.remote.verfer.keyhex and 698 ns2b(pubhex) == self.remote.pubber.keyhex) 699 700 sameAll = (sameRoleKeys and 701 name == self.remote.name and 702 rha == self.remote.ha and 703 fuid == self.remote.fuid and 704 main == self.remote.main and 705 kind == self.remote.kind) 706 707 if not sameAll and not self.stack.mutable: 708 emsg = ("Joiner {0}. Attempt to change immutable road by " 709 "'{1}'\n".format(self.stack.name, 710 self.remote.name)) 711 console.terse(emsg) 712 self.nack(kind=PcktKind.reject.value) # reject not mutable road 713 self.remove(index=self.txPacket.index) 714 return 715 716 status = self.stack.keep.statusRole(role=role, 717 verhex=verhex, 718 pubhex=pubhex, 719 dump=True) 720 721 if status == Acceptance.rejected: 722 if sameRoleKeys: 723 self.stack.removeRemote(self.remote, clear=True) 724 # remove also nacks so will also reject 725 else: 726 self.nack(kind=PcktKind.reject.value) # reject 727 return 728 729 # accepted or pending 730 self.remote.acceptance = status # change acceptance of remote 731 732 if not sameAll: # (and mutable) 733 if (name in self.stack.nameRemotes and 734 self.stack.nameRemotes[name] is not self.remote): # non unique name 735 emsg = "Joiner {0}. Name '{1}' unavailable for remote {2}\n".format( 736 self.stack.name, name, self.remote.name) 737 console.terse(emsg) 738 self.nack(kind=PcktKind.reject.value) 739 return 740 741 if name != self.remote.name: 742 try: 743 self.stack.renameRemote(self.remote, new=name) 744 except raeting.StackError as ex: 745 console.terse(str(ex) + '\n') 746 self.stack.incStat(self.statKey()) 747 self.remove(index=self.txPacket.index) 748 return 749 750 if rha != self.remote.ha: 751 self.remote.ha = rha 752 if fuid != self.remote.fuid: 753 self.remote.fuid = fuid 754 if main != self.remote.main: 755 self.remote.main = main 756 if kind != self.remote.kind: 757 self.remote.kind = kind 758 if self.remote.role != role: 759 self.remote.role = role # rerole 760 if ns2b(verhex) != self.remote.verfer.keyhex: 761 self.remote.verfer = nacling.Verifier(verhex) # verify key manager 762 if ns2b(pubhex) != self.remote.pubber.keyhex: 763 self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager 764 # don't dump until complete 765 766 if status == Acceptance.accepted: # accepted 767 self.completify() 768 return 769 770 # else status == raeting.acceptance.pending or None 771 self.pendify() 772 773 def pendify(self): 774 ''' 775 Perform pending on remote 776 ''' 777 self.stack.dumpRemote(self.remote) 778 self.ackPend() 779 780 def ackPend(self): 781 ''' 782 Send ack pending to accept response 783 ''' 784 body = odict() 785 packet = packeting.TxPacket(stack=self.stack, 786 kind=PcktKind.pend.value, 787 embody=body, 788 data=self.txData) 789 try: 790 packet.pack() 791 except raeting.PacketError as ex: 792 console.terse(str(ex) + '\n') 793 self.stack.incStat("packing_error") 794 self.remove(index=self.txPacket.index) 795 return 796 797 console.concise("Joiner {0}. Do Ack Pend of {1} in {2} at {3}\n".format( 798 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 799 800 self.transmit(packet) 801 802 def completify(self): 803 ''' 804 Finalize full acceptance 805 ''' 806 if self.remote.sid == 0: # session id must be non-zero after join 807 self.remote.nextSid() # start new session 808 self.remote.replaceStaleInitiators() # this join not stale since sid == 0 809 if self.vacuous: 810 self.remote.rsid = 0 # reset .rsid on vacuous join so allow will work 811 self.remote.joined = True #accepted 812 self.stack.dumpRemote(self.remote) 813 self.stack.dumpLocal() #persist puid 814 self.ackAccept() 815 816 def ackAccept(self): 817 ''' 818 Send ack accepted to accept response 819 ''' 820 body = odict() 821 packet = packeting.TxPacket(stack=self.stack, 822 kind=PcktKind.ack.value, 823 embody=body, 824 data=self.txData) 825 try: 826 packet.pack() 827 except raeting.PacketError as ex: 828 console.terse(str(ex) + '\n') 829 self.stack.incStat("packing_error") 830 self.remove(index=self.txPacket.index) 831 return 832 833 console.concise("Joiner {0}. Do Ack Accept, Done with {1} in {2} at {3}\n".format( 834 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 835 self.stack.incStat("join_initiate_complete") 836 837 self.transmit(packet) 838 self.remove(index=self.txPacket.index) # self.rxPacket.index 839 840 if self.cascade: 841 self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 842 843 def refuse(self): 844 ''' 845 Process nack to join packet refused as join already in progress or some 846 other problem that does not change the joined attribute 847 ''' 848 if not self.stack.parseInner(self.rxPacket): 849 return 850 console.terse("Joiner {0}. Refused by {1} in {2} at {3}\n".format( 851 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 852 self.stack.incStat(self.statKey()) 853 self.remove(index=self.txPacket.index) 854 855 def reject(self): 856 ''' 857 Process nack to join packet, join rejected 858 ''' 859 if not self.stack.parseInner(self.rxPacket): 860 return 861 console.terse("Joiner {0}. Rejected by {1} in {2} at {3}\n".format( 862 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 863 self.stack.incStat(self.statKey()) 864 self.remove(index=self.txPacket.index) 865 self.stack.removeRemote(self.remote, clear=True) 866 867 def nack(self, kind=PcktKind.nack.value): 868 ''' 869 Send nack to accept response 870 ''' 871 body = odict() 872 packet = packeting.TxPacket(stack=self.stack, 873 kind=kind, 874 embody=body, 875 data=self.txData) 876 try: 877 packet.pack() 878 except raeting.PacketError as ex: 879 console.terse(str(ex) + '\n') 880 self.stack.incStat("packing_error") 881 self.remove(index=self.txPacket.index) 882 return 883 884 if kind == PcktKind.refuse: 885 console.terse("Joiner {0}. Do Nack Refuse of {1} in {2} at {3}\n".format( 886 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 887 elif kind == PcktKind.reject: 888 console.terse("Joiner {0}. Do Nack Reject of {1} in {2} at {3}\n".format( 889 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 890 elif kind == PcktKind.nack: 891 console.terse("Joiner {0}. Do Nack of {1} in {2} at {3}\n".format( 892 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 893 else: 894 console.terse("Joiner {0}. Invalid nack kind {1}. Do Nack of {2} anyway " 895 "in {3} at {4}\n".format(self.stack.name, 896 kind, 897 self.remote.name, 898 self.tid, 899 self.stack.store.stamp)) 900 kind == PcktKind.nack 901 self.stack.incStat(self.statKey()) 902 self.transmit(packet) 903 self.remove(index=self.txPacket.index) 904 905class Joinent(Correspondent): 906 ''' 907 RAET protocol Joinent transaction class, dual of Joiner 908 909 Joinent does not add new remote to .remotes if rejected 910 ''' 911 RedoTimeoutMin = 0.1 # initial timeout 912 RedoTimeoutMax = 2.0 # max timeout 913 PendRedoTimeout = 60.0 # redo timeout when pended 914 915 def __init__(self, 916 redoTimeoutMin=None, 917 redoTimeoutMax=None, 918 pendRedoTimeout=None, 919 **kwa): 920 ''' 921 Setup Transaction instance 922 ''' 923 kwa['kind'] = TrnsKind.join.value 924 super(Joinent, self).__init__(**kwa) 925 926 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 927 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 928 self.redoTimer = StoreTimer(self.stack.store, duration=0.0) 929 self.pendRedoTimeout = pendRedoTimeout or self.PendRedoTimeout 930 self.vacuous = None # gets set in join method 931 self.pended = False # Farside initiator has pended remote acceptance 932 self.prep() 933 934 def transmit(self, packet): 935 ''' 936 Augment transmit with restart of redo timer 937 ''' 938 super(Joinent, self).transmit(packet) 939 self.redoTimer.restart() 940 941 def add(self, remote=None, index=None): 942 ''' 943 Augment with add self.remote to stack.joinees if vacuous 944 ''' 945 super(Joinent, self).add(remote=remote, index=index) 946 # self.remote is now assigned 947 if self.vacuous: # vacuous happens when both sides vacuous 948 self.stack.joinees[self.remote.ha] = self.remote 949 950 def remove(self, remote=None, index=None): 951 ''' 952 Remove self from stack transactions 953 ''' 954 super(Joinent, self).remove(remote=remote, index=index) 955 # self.remote is now assigned 956 if self.vacuous: # vacuous 957 if self.remote.ha in self.stack.joinees and not self.remote.transactions: 958 del self.stack.joinees[self.remote.ha] 959 960 def receive(self, packet): 961 """ 962 Process received packet belonging to this transaction 963 """ 964 super(Joinent, self).receive(packet) # self.rxPacket = packet 965 966 if packet.data['tk'] == TrnsKind.join: 967 if packet.data['pk'] == PcktKind.request: 968 self.stack.incStat('joinent_rx_request') 969 self.join() 970 elif packet.data['pk'] == PcktKind.pend: # maybe pending 971 self.stack.incStat('joinent_rx_pend') 972 self.pend() 973 elif packet.data['pk'] == PcktKind.ack: #accepted by joiner 974 self.stack.incStat('joinent_rx_ack') 975 self.complete() 976 elif packet.data['pk'] == PcktKind.nack: #stale 977 self.stack.incStat('joinent_rx_nack') 978 self.refuse() 979 elif packet.data['pk'] == PcktKind.refuse: #refused 980 self.stack.incStat('joinent_rx_refuse') 981 self.refuse() 982 elif packet.data['pk'] == PcktKind.reject: #rejected 983 self.stack.incStat('joinent_rx_reject') 984 self.reject() 985 986 def process(self): 987 ''' 988 Perform time based processing of transaction 989 990 ''' 991 if self.timeout > 0.0 and self.timer.expired: 992 self.nack() # stale 993 console.concise("Joinent {0}. Timed out with {1} in {2} at {3}\n".format( 994 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 995 return 996 997 # need to perform the check for accepted status and then send accept 998 if self.redoTimer.expired: 999 if not self.pended: 1000 duration = min( 1001 max(self.redoTimeoutMin, 1002 self.redoTimer.duration * 2.0), 1003 self.redoTimeoutMax) 1004 else: 1005 duration = self.pendRedoTimeout 1006 1007 self.redoTimer.restart(duration=duration) 1008 1009 if (self.txPacket and 1010 self.txPacket.data['pk'] == PcktKind.response): 1011 self.transmit(self.txPacket) #redo 1012 console.concise("Joinent {0}. Redo Accept with {1} in {2} at {3}\n".format( 1013 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1014 self.stack.incStat('joinent_tx_accept_redo') 1015 else: #check to see if status has changed to accept 1016 if self.remote: 1017 status = self.stack.keep.statusRemote(self.remote, dump=True) 1018 if status == Acceptance.accepted: 1019 self.ackAccept() 1020 elif status == Acceptance.rejected: 1021 "Stack {0}: Estate '{1}' uid '{2}' keys rejected\n".format( 1022 self.stack.name, self.remote.name, self.remote.uid) 1023 self.stack.removeRemote(self.remote,clear=True) 1024 # removeRemote also nacks 1025 1026 def prep(self): 1027 ''' 1028 Prepare .txData 1029 ''' 1030 #since bootstrap transaction use the reversed seid and deid from packet 1031 self.txData.update( 1032 dh=self.rxPacket.data['sh'], # may need for index 1033 dp=self.rxPacket.data['sp'], # may need for index 1034 se=self.rxPacket.data['de'], 1035 de=self.rxPacket.data['se'], 1036 tk=self.kind, 1037 cf=self.rmt, 1038 bf=self.bcst, 1039 si=self.sid, 1040 ti=self.tid, 1041 ck=CoatKind.nada.value, 1042 fk=FootKind.nada.value, 1043 ) 1044 1045 def join(self): 1046 ''' 1047 Process join packet 1048 Each estate must have a set of unique credentials on the road 1049 The credentials are. 1050 uid (estate id), name, ha (host address, port) 1051 Each of the three credentials must be separably unique on the Road, that is 1052 the uid must be unique, the name must be unique, the ha must be unique. 1053 1054 The other credentials are the role and keys. Multiple estates may share 1055 the same role and associated keys. The keys are the signing key and the 1056 encryption key. 1057 1058 Once an estate has joined the first time it will be assigned an uid. 1059 Changing any of the credentials after this requires that the Road be mutable. 1060 1061 ''' 1062 if not self.stack.parseInner(self.rxPacket): 1063 return 1064 1065 # Don't add transaction yet wait till later until transaction is permitted 1066 # as not a duplicate and role keys are not rejected 1067 1068 data = self.rxPacket.data 1069 body = self.rxPacket.body.data 1070 1071 name = body.get('name') 1072 if not name: 1073 emsg = "Missing remote name in join packet\n" 1074 console.terse(emsg) 1075 self.stack.incStat('invalid_join') 1076 self.remove(index=self.rxPacket.index) 1077 return 1078 1079 mode = body.get('mode') 1080 if mode is None or not isinstance(mode, int) or mode < 0 or mode > 255: 1081 emsg = "Missing or invalid remote stack operation mode in join packet\n" 1082 console.terse(emsg) 1083 self.stack.incStat('invalid_join') 1084 self.remove(index=self.rxPacket.index) 1085 return 1086 flags = unpackByte(fmt=b'11111111', byte=mode, boolean=True) 1087 main = flags[7] 1088 1089 kind = body.get('kind') 1090 if kind is None: 1091 emsg = "Missing or invalid remote application kind in join packet\n" 1092 console.terse(emsg) 1093 self.stack.incStat('invalid_join') 1094 self.remove(index=self.rxPacket.index) 1095 return 1096 1097 verhex = body.get('verhex', '') 1098 if not verhex: 1099 emsg = "Missing remote verifier key in join packet\n" 1100 console.terse(emsg) 1101 self.stack.incStat('invalid_join') 1102 self.remove(index=self.rxPacket.index) 1103 return 1104 1105 pubhex = body.get('pubhex', '') 1106 if not pubhex: 1107 emsg = "Missing remote crypt key in join packet\n" 1108 console.terse(emsg) 1109 self.stack.incStat('invalid_join') 1110 self.remove(index=self.rxPacket.index) 1111 return 1112 1113 role = body.get('role') 1114 if not role: 1115 emsg = "Missing remote role in join packet\n" 1116 console.terse(emsg) 1117 self.stack.incStat('invalid_join') 1118 self.remove(index=self.rxPacket.index) 1119 return 1120 1121 rha = (data['sh'], data['sp']) 1122 reid = data['se'] 1123 leid = data['de'] 1124 1125 self.vacuous = (leid == 0) 1126 1127 joins = self.remote.joinInProcess() 1128 for join in joins: # only one join at a time is permitted 1129 if join is self: # duplicate join packet so drop 1130 emsg = ("Joinent {0}. Duplicate join from {1}. " 1131 "Dropping...\n".format(self.stack.name, self.remote.name)) 1132 console.concise(emsg) 1133 self.stack.incStat('duplicate_join_attempt') 1134 return 1135 1136 if join.rmt: # is already a correspondent to a join 1137 emsg = ("Joinent {0}. Another joinent already in process with {1}. " 1138 "Aborting...\n".format(self.stack.name, self.remote.name)) 1139 console.concise(emsg) 1140 self.stack.incStat('redundant_join_attempt') 1141 self.nack(kind=PcktKind.refuse.value) 1142 return 1143 1144 else: # already initiator join in process, resolve race condition 1145 if self.vacuous and not join.vacuous: # non-vacuous beats vacuous 1146 emsg = ("Joinent {0}. Already initiated non-vacuous join with {1}. " 1147 "Aborting because vacuous...\n".format( 1148 self.stack.name, self.remote.name)) 1149 console.concise(emsg) 1150 self.stack.incStat('redundant_join_attempt') 1151 self.nack(kind=PcktKind.refuse.value) 1152 return 1153 1154 if not self.vacuous and join.vacuous: # non-vacuous beats vacuous 1155 emsg = ("Joinent {0}. Removing vacuous initiator join with" 1156 " {1}. Proceeding because not vacuous...\n".format( 1157 self.stack.name, self.remote.name)) 1158 console.concise(emsg) 1159 join.nack(kind=PcktKind.refuse.value) 1160 1161 else: # both vacuous or non-vacuous, so use name to resolve 1162 if self.stack.local.name < name: # abort local correspondent and remote initiator 1163 emsg = ("Joinent {0}. Already initiated join with {1}. " 1164 "Aborting because lesser local name...\n".format( 1165 self.stack.name, self.remote.name)) 1166 console.concise(emsg) 1167 self.stack.incStat('redundant_join_attempt') 1168 self.nack(kind=PcktKind.refuse.value) 1169 return 1170 1171 else: # nack to abort local initiator and remote correspondent 1172 emsg = ("Joinent {0}. Removing initiator join with {1}. " 1173 "Proceeding because lesser local name...\n".format( 1174 self.stack.name, self.remote.name)) 1175 console.concise(emsg) 1176 join.nack(kind=PcktKind.refuse.value) 1177 1178 if self.vacuous: # vacuous join 1179 if not self.stack.main: 1180 emsg = "Joinent {0}. Invalid vacuous join not main\n".format(self.stack.name) 1181 console.terse(emsg) 1182 self.nack(kind=PcktKind.reject.value) 1183 return 1184 1185 if name in self.stack.nameRemotes: # non ephemeral name match 1186 self.remote = self.stack.nameRemotes[name] # replace so not ephemeral 1187 1188 else: # ephemeral and unique name 1189 self.remote.name = name 1190 self.remote.main = main 1191 self.remote.kind = kind 1192 self.remote.rha = rha 1193 self.remote.role = role 1194 self.remote.verfer = nacling.Verifier(verhex) # verify key manager 1195 self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager 1196 if self.remote.fuid != reid: 1197 if self.remote.fuid == 0: # vacuous join created remote in stack 1198 self.remote.fuid = reid 1199 else: 1200 emsg = ("Joinent {0}. Mishandled join reid='{1}' != fuid='{2}' for " 1201 "remote {2}\n".format(self.stack.name, reid, self.remote.fuid, name)) 1202 console.terse(emsg) 1203 self.nack(kind=PcktKind.reject.value) 1204 return 1205 1206 else: # non vacuous join 1207 if self.remote is not self.stack.remotes[leid]: # something is wrong 1208 emsg = "Joinent {0}. Mishandled join leid '{1}' for remote {2}\n".format( 1209 self.stack.name, leid, name) 1210 console.terse(emsg) 1211 self.nack(kind=PcktKind.reject.value) 1212 return 1213 1214 1215 sameRoleKeys = (role == self.remote.role and 1216 ns2b(verhex) == self.remote.verfer.keyhex and 1217 ns2b(pubhex) == self.remote.pubber.keyhex) 1218 1219 sameAll = (sameRoleKeys and 1220 name == self.remote.name and 1221 rha == self.remote.ha and 1222 reid == self.remote.fuid and 1223 main == self.remote.main and 1224 kind == self.remote.kind) 1225 1226 if not sameAll and not self.stack.mutable: 1227 emsg = ("Joinent {0}. Attempt to change immutable road by " 1228 "'{1}'\n".format(self.stack.name, 1229 self.remote.name)) 1230 console.terse(emsg) 1231 # reject not mutable road 1232 self.nack(kind=PcktKind.reject.value) 1233 return 1234 1235 status = self.stack.keep.statusRole(role=role, 1236 verhex=verhex, 1237 pubhex=pubhex, 1238 dump=True) 1239 1240 1241 if status == Acceptance.rejected: 1242 emsg = ("Joinent {0}. Keys of role='{1}' rejected for remote name='{2}'" 1243 " nuid='{3}' fuid='{4}' ha='{5}'\n".format(self.stack.name, 1244 self.remote.role, 1245 self.remote.name, 1246 self.remote.nuid, 1247 self.remote.fuid, 1248 self.remote.ha)) 1249 console.concise(emsg) 1250 if sameRoleKeys and self.remote.uid in self.stack.remotes: 1251 self.stack.removeRemote(self.remote, clear=True) #clear remote 1252 # removeRemote also nacks which is a reject 1253 else: # reject as keys rejected 1254 self.nack(kind=PcktKind.reject.value) 1255 return 1256 1257 #accepted or pended 1258 self.remote.acceptance = status 1259 1260 if sameAll: #ephemeral will always be sameAll because assigned above 1261 if self.remote.uid not in self.stack.remotes: # ephemeral 1262 try: 1263 self.stack.addRemote(self.remote) 1264 except raeting.StackError as ex: 1265 console.terse(str(ex) + '\n') 1266 self.stack.incStat(self.statKey()) 1267 return 1268 1269 emsg = ("Joinent {0}. Added new remote name='{1}' nuid='{2}' fuid='{3}' " 1270 "ha='{4}' role='{5}'\n".format(self.stack.name, 1271 self.remote.name, 1272 self.remote.nuid, 1273 self.remote.fuid, 1274 self.remote.ha, 1275 self.remote.role)) 1276 console.concise(emsg) 1277 # do dump until complete 1278 1279 else: # not sameAll (and mutable) 1280 # do both unique name check first so only change road if new unique 1281 if (name in self.stack.nameRemotes and 1282 self.stack.nameRemotes[name] is not self.remote): # non unique name 1283 emsg = "Joinent {0}. Name '{1}' unavailable for remote {2}\n".format( 1284 self.stack.name, name, self.remote.name) 1285 console.terse(emsg) 1286 self.nack(kind=PcktKind.reject.value) 1287 return 1288 1289 if name != self.remote.name: 1290 try: 1291 self.stack.renameRemote(self.remote, new=name) 1292 except raeting.StackError as ex: 1293 console.terse(str(ex) + '\n') 1294 self.stack.incStat(self.statKey()) 1295 return 1296 1297 if rha != self.remote.ha: 1298 self.remote.ha = rha 1299 if reid != self.remote.fuid: 1300 self.remote.fuid = reid 1301 if main != self.remote.main: 1302 self.remote.main = main 1303 if kind != self.remote.kind: 1304 self.remote.kind = kind 1305 if role != self.remote.role: # rerole 1306 self.remote.role = role 1307 if ns2b(verhex) != self.remote.verfer.keyhex: 1308 self.remote.verfer = nacling.Verifier(verhex) # verify key manager 1309 if ns2b(pubhex) != self.remote.pubber.keyhex: 1310 self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager 1311 1312 # add transaction 1313 self.add(remote=self.remote, index=self.rxPacket.index) 1314 self.remote.joined = None 1315 1316 if status == Acceptance.accepted: 1317 duration = min( 1318 max(self.redoTimeoutMin, 1319 self.redoTimer.duration * 2.0), 1320 self.redoTimeoutMax) 1321 self.redoTimer.restart(duration=duration) 1322 self.ackAccept() 1323 return 1324 1325 # status == raeting.acceptance.pending or status == None: 1326 self.pendify() # change to ackPend 1327 1328 def pendify(self): 1329 ''' 1330 Performing pending operation on remote 1331 ''' 1332 self.stack.dumpRemote(self.remote) 1333 self.ackPend() 1334 1335 def ackPend(self): 1336 ''' 1337 Send ack to join request 1338 ''' 1339 body = odict() 1340 packet = packeting.TxPacket(stack=self.stack, 1341 kind=PcktKind.pend.value, 1342 embody=body, 1343 data=self.txData) 1344 try: 1345 packet.pack() 1346 except raeting.PacketError as ex: 1347 console.terse(str(ex) + '\n') 1348 self.stack.incStat("packing_error") 1349 self.remove(index=self.rxPacket.index) 1350 return 1351 1352 console.concise("Joinent {0}. Do Ack Pending accept of {1} in {2} at {3}\n".format( 1353 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1354 self.transmit(packet) 1355 1356 def ackAccept(self): 1357 ''' 1358 Send accept response to join request 1359 ''' 1360 if self.stack.kind is None: 1361 self.stack.kind = 0 1362 else: 1363 if self.stack.kind < 0 or self.stack.kind > 255: 1364 emsg = ("Joinent {0}. Invalid application kind field value {1} for {2}. " 1365 "Aborting...\n".format( 1366 self.stack.name, 1367 self.stack.kind, 1368 self.remote.name)) 1369 console.concise(emsg) 1370 return 1371 1372 flags = [0, 0, 0, 0, 0, 0, 0, self.stack.main] # stack operation mode flags 1373 operation = packByte(fmt=b'11111111', fields=flags) 1374 body = odict([ ('name', self.stack.local.name), 1375 ('mode', operation), 1376 ('kind', self.stack.kind), 1377 ('uid', self.remote.uid), 1378 ('verhex', str(self.stack.local.signer.verhex.decode('ISO-8859-1')) 1379 if self.stack.local.signer.verhex else None ), 1380 ('pubhex', str(self.stack.local.priver.pubhex.decode('ISO-8859-1')) 1381 if self.stack.local.priver.pubhex else None), 1382 ('role', self.stack.local.role)]) 1383 packet = packeting.TxPacket(stack=self.stack, 1384 kind=PcktKind.response.value, 1385 embody=body, 1386 data=self.txData) 1387 try: 1388 packet.pack() 1389 except raeting.PacketError as ex: 1390 console.terse(str(ex) + '\n') 1391 self.stack.incStat("packing_error") 1392 self.remove(index=self.rxPacket.index) 1393 return 1394 1395 console.concise("Joinent {0}. Do Accept of {1} in {2} at {3}\n".format( 1396 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1397 self.transmit(packet) 1398 1399 def pend(self): 1400 ''' 1401 Process ack pend to join packet 1402 ''' 1403 if not self.stack.parseInner(self.rxPacket): 1404 return 1405 self.pended = True 1406 1407 def complete(self): 1408 ''' 1409 process ack to accept response 1410 ''' 1411 if not self.stack.parseInner(self.rxPacket): 1412 return 1413 1414 console.concise("Joinent {0}. Done with {1} in {2} at {3}\n".format( 1415 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1416 self.stack.incStat("join_correspond_complete") 1417 1418 if self.remote.sid == 0: # session id must be non-zero after join 1419 self.remote.nextSid() # start new session 1420 self.remote.replaceStaleInitiators() 1421 if self.vacuous: 1422 self.remote.rsid = 0 # reset .rsid on vacuous join so allow will work 1423 self.remote.joined = True # accepted 1424 self.stack.dumpRemote(self.remote) 1425 self.stack.dumpLocal() # persist puid 1426 self.remove(index=self.rxPacket.index) 1427 1428 def reject(self): 1429 ''' 1430 Process reject nack because keys rejected 1431 ''' 1432 if not self.stack.parseInner(self.rxPacket): 1433 return 1434 1435 console.terse("Joinent {0}. Rejected by {1} in {2} at {3}\n".format( 1436 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1437 self.stack.incStat(self.statKey()) 1438 self.remove(index=self.rxPacket.index) 1439 self.stack.removeRemote(self.remote, clear=True) 1440 1441 def refuse(self): 1442 ''' 1443 Process refuse nack because join already in progress or stale 1444 ''' 1445 if not self.stack.parseInner(self.rxPacket): 1446 return 1447 console.terse("Joinent {0}. Refused by {1} in {2} at {3}\n".format( 1448 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1449 self.stack.incStat(self.statKey()) 1450 self.remove(index=self.rxPacket.index) 1451 1452 def nack(self, kind=PcktKind.nack.value): 1453 ''' 1454 Send nack to join request. 1455 Sometimes nack occurs without remote being added so have to nack using 1456 rxPacket source ha. 1457 ''' 1458 #if not self.remote or self.remote.uid not in self.stack.remotes: 1459 #self.txData.update( dh=self.rxPacket.data['sh'], dp=self.rxPacket.data['sp'],) 1460 #ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp']) 1461 #else: 1462 #ha = self.remote.ha 1463 1464 ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp']) 1465 1466 body = odict() 1467 packet = packeting.TxPacket(stack=self.stack, 1468 kind=kind, 1469 embody=body, 1470 data=self.txData) 1471 try: 1472 packet.pack() 1473 except raeting.PacketError as ex: 1474 console.terse(str(ex) + '\n') 1475 self.stack.incStat("packing_error") 1476 self.remove(index=self.rxPacket.index) 1477 return 1478 1479 if kind == PcktKind.renew: 1480 console.terse("Joinent {0}. Do Nack Renew of {1} in {2} at {3}\n".format( 1481 self.stack.name, ha, self.tid, self.stack.store.stamp)) 1482 elif kind == PcktKind.refuse: 1483 console.terse("Joinent {0}. Do Nack Refuse of {1} in {2} at {3}\n".format( 1484 self.stack.name, ha, self.tid, self.stack.store.stamp)) 1485 elif kind == PcktKind.reject: 1486 console.terse("Joinent {0}. Do Nack Reject of {1} in {2} at {3}\n".format( 1487 self.stack.name, ha, self.tid, self.stack.store.stamp)) 1488 elif kind == PcktKind.nack: 1489 console.terse("Joinent {0}. Do Nack of {1} in {2} at {3}\n".format( 1490 self.stack.name, ha, self.tid, self.stack.store.stamp)) 1491 else: 1492 console.terse("Joinent {0}. Invalid nack kind {1}. Do Nack of {2} anyway " 1493 " in {3} at {4}\n".format(self.stack.name, 1494 kind, 1495 ha, 1496 self.tid, 1497 self.stack.store.stamp)) 1498 kind == PcktKind.nack 1499 1500 self.stack.incStat(self.statKey()) 1501 1502 self.stack.txes.append((packet.packed, ha)) 1503 self.remove(index=self.rxPacket.index) 1504 1505class Allower(Initiator): 1506 ''' 1507 RAET protocol Allower Initiator class Dual of Allowent 1508 CurveCP handshake 1509 ''' 1510 Timeout = 4.0 1511 RedoTimeoutMin = 0.25 # initial timeout 1512 RedoTimeoutMax = 1.0 # max timeout 1513 1514 def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, 1515 cascade=False, **kwa): 1516 ''' 1517 Setup instance 1518 ''' 1519 kwa['kind'] = TrnsKind.allow.value 1520 super(Allower, self).__init__(**kwa) 1521 1522 self.cascade = cascade 1523 1524 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 1525 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 1526 self.redoTimer = StoreTimer(self.stack.store, 1527 duration=self.redoTimeoutMin) 1528 1529 self.sid = self.remote.sid 1530 self.tid = self.remote.nextTid() 1531 self.oreo = None # cookie from correspondent needed until handshake completed 1532 self.prep() # prepare .txData 1533 1534 def transmit(self, packet): 1535 ''' 1536 Augment transmit with restart of redo timer 1537 ''' 1538 super(Allower, self).transmit(packet) 1539 self.redoTimer.restart() 1540 1541 def receive(self, packet): 1542 """ 1543 Process received packet belonging to this transaction 1544 """ 1545 super(Allower, self).receive(packet) # self.rxPacket = packet 1546 1547 if packet.data['tk'] == TrnsKind.allow: 1548 if packet.data['pk'] == PcktKind.cookie: 1549 self.cookie() 1550 elif packet.data['pk'] == PcktKind.ack: 1551 self.allow() 1552 elif packet.data['pk'] == PcktKind.nack: # rejected 1553 self.refuse() 1554 elif packet.data['pk'] == PcktKind.refuse: # refused 1555 self.refuse() 1556 elif packet.data['pk'] == PcktKind.reject: #rejected 1557 self.reject() 1558 elif packet.data['pk'] == PcktKind.unjoined: # unjoined 1559 self.unjoin() 1560 1561 def process(self): 1562 ''' 1563 Perform time based processing of transaction 1564 ''' 1565 if self.timeout > 0.0 and self.timer.expired: 1566 self.remove() 1567 console.concise("Allower {0}. Timed out with {1} in {2} at {3}\n".format( 1568 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1569 return 1570 1571 # need keep sending join until accepted or timed out 1572 if self.redoTimer.expired: 1573 duration = min( 1574 max(self.redoTimeoutMin, 1575 self.redoTimer.duration * 2.0), 1576 self.redoTimeoutMax) 1577 self.redoTimer.restart(duration=duration) 1578 if self.txPacket: 1579 if self.txPacket.data['pk'] == PcktKind.hello: 1580 self.transmit(self.txPacket) # redo 1581 console.concise("Allower {0}. Redo Hello with {1} in {2} at {3}\n".format( 1582 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1583 self.stack.incStat('redo_hello') 1584 1585 if self.txPacket.data['pk'] == PcktKind.initiate: 1586 self.transmit(self.txPacket) # redo 1587 console.concise("Allower {0}. Redo Initiate with {1} in {2} at {3}\n".format( 1588 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1589 self.stack.incStat('redo_initiate') 1590 1591 if self.txPacket.data['pk'] == PcktKind.ack: 1592 self.transmit(self.txPacket) # redo 1593 console.concise("Allower {0}. Redo Ack Final with {1} in {2} at {3}\n".format( 1594 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1595 self.stack.incStat('redo_final') 1596 1597 def prep(self): 1598 ''' 1599 Prepare .txData 1600 ''' 1601 self.txData.update( 1602 dh=self.remote.ha[0], # maybe needed for index 1603 dp=self.remote.ha[1], # maybe needed for index 1604 se=self.remote.nuid, 1605 de=self.remote.fuid, 1606 tk=self.kind, 1607 cf=self.rmt, 1608 bf=self.bcst, 1609 si=self.sid, 1610 ti=self.tid, 1611 ) 1612 1613 def hello(self): 1614 ''' 1615 Send hello request 1616 ''' 1617 joins = self.remote.joinInProcess() 1618 if joins: 1619 emsg = ("Allower {0}. Attempt to allow while join still in process with {1}. " 1620 "Aborting...\n".format(self.stack.name, self.remote.name)) 1621 console.concise(emsg) 1622 self.stack.incStat('invalid_allow_attempt') 1623 return 1624 1625 allows = self.remote.allowInProcess() 1626 if allows: 1627 emsg = ("Allower {0}. Allow with {1} already in process\n".format( 1628 self.stack.name, self.remote.name)) 1629 console.concise(emsg) 1630 return 1631 1632 self.remote.allowed = None 1633 if not self.remote.joined: 1634 emsg = "Allower {0}. Must be joined first\n".format(self.stack.name) 1635 console.terse(emsg) 1636 self.stack.incStat('unjoined_remote') 1637 self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 1638 return 1639 1640 self.remote.rekey() # refresh short term keys and reset .allowed to None 1641 self.add() 1642 1643 plain = binascii.hexlify(b''.rjust(32, b'\x00')) 1644 cipher, nonce = self.remote.privee.encrypt(plain, self.remote.pubber.key) 1645 body = raeting.HELLO_PACKER.pack(plain, self.remote.privee.pubraw, cipher, nonce) 1646 1647 packet = packeting.TxPacket(stack=self.stack, 1648 kind=PcktKind.hello, 1649 embody=body, 1650 data=self.txData) 1651 try: 1652 packet.pack() 1653 except raeting.PacketError as ex: 1654 console.terse(str(ex) + '\n') 1655 self.stack.incStat("packing_error") 1656 self.remove() 1657 return 1658 self.transmit(packet) 1659 console.concise("Allower {0}. Do Hello with {1} in {2} at {3}\n".format( 1660 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1661 1662 def cookie(self): 1663 ''' 1664 Process cookie packet 1665 ''' 1666 if not self.stack.parseInner(self.rxPacket): 1667 return 1668 1669 data = self.rxPacket.data 1670 body = self.rxPacket.body.data 1671 1672 if not isinstance(body, bytes): 1673 emsg = "Invalid format of cookie packet body\n" 1674 console.terse(emsg) 1675 self.stack.incStat('invalid_cookie') 1676 #self.remove() 1677 self.nack(kind=PcktKind.reject.value) 1678 return 1679 1680 if len(body) != raeting.COOKIE_PACKER.size: 1681 emsg = "Invalid length of cookie packet body\n" 1682 console.terse(emsg) 1683 self.stack.incStat('invalid_cookie') 1684 #self.remove() 1685 self.nack(kind=PcktKind.reject.value) 1686 return 1687 1688 cipher, nonce = raeting.COOKIE_PACKER.unpack(body) 1689 1690 try: 1691 msg = self.remote.privee.decrypt(cipher, nonce, self.remote.pubber.key) 1692 except ValueError as ex: 1693 emsg = "Invalid cookie stuff: '{0}'\n".format(str(ex)) 1694 console.terse(emsg) 1695 self.stack.incStat('invalid_cookie') 1696 #self.remove() 1697 self.nack(kind=PcktKind.reject.value) 1698 return 1699 1700 if len(msg) != raeting.COOKIESTUFF_PACKER.size: 1701 emsg = "Invalid length of cookie stuff\n" 1702 console.terse(emsg) 1703 self.stack.incStat('invalid_cookie') 1704 #self.remove() 1705 self.nack(kind=PcktKind.reject.value) 1706 return 1707 1708 shortraw, seid, deid, oreo = raeting.COOKIESTUFF_PACKER.unpack(msg) 1709 1710 if seid != self.remote.fuid or deid != self.remote.nuid: 1711 emsg = "Invalid seid or deid fields in cookie stuff\n" 1712 console.terse(emsg) 1713 self.stack.incStat('invalid_cookie') 1714 #self.remove() 1715 self.nack(kind=PcktKind.reject.value) 1716 return 1717 1718 self.oreo = binascii.hexlify(oreo) 1719 self.remote.publee = nacling.Publican(key=shortraw) 1720 1721 self.initiate() 1722 1723 def initiate(self): 1724 ''' 1725 Send initiate request to cookie response to hello request 1726 ''' 1727 vcipher, vnonce = self.stack.local.priver.encrypt(self.remote.privee.pubraw, 1728 self.remote.pubber.key) 1729 1730 fqdn = self.remote.fqdn 1731 if isinstance(fqdn, unicode): 1732 fqdn = fqdn.encode('ascii', 'ignore') 1733 fqdn = fqdn.ljust(128, b' ')[:128] 1734 1735 stuff = raeting.INITIATESTUFF_PACKER.pack(self.stack.local.priver.pubraw, 1736 vcipher, 1737 vnonce, 1738 fqdn) 1739 1740 cipher, nonce = self.remote.privee.encrypt(stuff, self.remote.publee.key) 1741 1742 oreo = binascii.unhexlify(self.oreo) 1743 body = raeting.INITIATE_PACKER.pack(self.remote.privee.pubraw, 1744 oreo, 1745 cipher, 1746 nonce) 1747 1748 packet = packeting.TxPacket(stack=self.stack, 1749 kind=PcktKind.initiate, 1750 embody=body, 1751 data=self.txData) 1752 try: 1753 packet.pack() 1754 except raeting.PacketError as ex: 1755 console.terse(str(ex) + '\n') 1756 self.stack.incStat("packing_error") 1757 self.remove() 1758 return 1759 1760 self.transmit(packet) 1761 console.concise("Allower {0}. Do Initiate with {1} in {2} at {3}\n".format( 1762 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1763 1764 def allow(self): 1765 ''' 1766 Process ackInitiate packet 1767 Perform allowment in response to ack to initiate packet 1768 Transmits ack to complete transaction so correspondent knows 1769 ''' 1770 if not self.stack.parseInner(self.rxPacket): 1771 return 1772 1773 self.remote.allowed = True 1774 self.remote.alived = True # fast alive as soon as allowed 1775 self.ackFinal() 1776 1777 def ackFinal(self): 1778 ''' 1779 Send ack to ack Initiate to terminate transaction 1780 This is so both sides wait on acks so transaction is not restarted until 1781 boths sides see completion. 1782 ''' 1783 body = b'' 1784 packet = packeting.TxPacket(stack=self.stack, 1785 kind=PcktKind.ack.value, 1786 embody=body, 1787 data=self.txData) 1788 try: 1789 packet.pack() 1790 except raeting.PacketError as ex: 1791 console.terse(str(ex) + '\n') 1792 self.stack.incStat("packing_error") 1793 self.remove() 1794 return 1795 1796 self.remove() 1797 self.transmit(packet) 1798 1799 console.concise("Allower {0}. Do Ack Final, Done with {1} in {2} at {3}\n".format( 1800 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1801 self.stack.incStat("allow_initiate_complete") 1802 1803 self.remote.nextSid() # start new session always on successful allow 1804 self.remote.replaceStaleInitiators() 1805 self.stack.dumpRemote(self.remote) 1806 self.remote.sendSavedMessages() # could include messages saved on rejoin 1807 if self.cascade: 1808 self.stack.alive(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 1809 1810 def nack(self, kind=PcktKind.nack.value): 1811 ''' 1812 Send nack to accept response 1813 ''' 1814 body = b'' 1815 packet = packeting.TxPacket(stack=self.stack, 1816 kind=kind, 1817 embody=body, 1818 data=self.txData) 1819 try: 1820 packet.pack() 1821 except raeting.PacketError as ex: 1822 console.terse(str(ex) + '\n') 1823 self.stack.incStat("packing_error") 1824 self.remove() 1825 return 1826 1827 if kind == PcktKind.refuse: 1828 console.terse("Allower {0}. Do Nack Refuse of {1} in {2} at {3}\n".format( 1829 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1830 elif kind == PcktKind.reject: 1831 console.terse("Allower {0}. Do Nack Reject of {1} in {2} at {3}\n".format( 1832 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1833 elif kind == PcktKind.nack: 1834 console.terse("Allower {0}. Do Nack of {1} in {2} at {3}\n".format( 1835 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1836 else: 1837 console.terse("Allower {0}. Invalid nack kind {1}. Do Nack of {2} anyway " 1838 " in {3} at {4}\n".format(self.stack.name, 1839 kind, 1840 self.remote.name, 1841 self.tid, 1842 self.stack.store.stamp)) 1843 kind == PcktKind.nack 1844 1845 self.remove() 1846 self.stack.incStat(self.statKey()) 1847 self.transmit(packet) 1848 1849 def refuse(self): 1850 ''' 1851 Process nack refuse to packet 1852 ''' 1853 if not self.stack.parseInner(self.rxPacket): 1854 return 1855 1856 self.remove() 1857 console.concise("Allower {0}. Refused by {1} in {2} at {3}\n".format( 1858 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1859 self.stack.incStat(self.statKey()) 1860 1861 def reject(self): 1862 ''' 1863 Process nack reject to packet 1864 terminate in response to nack 1865 ''' 1866 if not self.stack.parseInner(self.rxPacket): 1867 return 1868 1869 self.remote.allowed = False 1870 self.remove() 1871 console.concise("Allower {0}. Rejected by {1} in {2} at {3}\n".format( 1872 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1873 self.stack.incStat(self.statKey()) 1874 1875 def unjoin(self): 1876 ''' 1877 Process unjoin packet 1878 terminate in response to unjoin 1879 ''' 1880 if not self.stack.parseInner(self.rxPacket): 1881 return 1882 1883 self.remote.joined = False 1884 self.remove() 1885 console.concise("Allower {0}. Rejected unjoin by {1} in {2} at {3}\n".format( 1886 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1887 self.stack.incStat(self.statKey()) 1888 self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 1889 1890class Allowent(Correspondent): 1891 ''' 1892 RAET protocol Allowent Correspondent class Dual of Allower 1893 CurveCP handshake 1894 ''' 1895 Timeout = 4.0 1896 RedoTimeoutMin = 0.25 # initial timeout 1897 RedoTimeoutMax = 1.0 # max timeout 1898 1899 def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, **kwa): 1900 ''' 1901 Setup instance 1902 ''' 1903 kwa['kind'] = TrnsKind.allow.value 1904 super(Allowent, self).__init__(**kwa) 1905 1906 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 1907 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 1908 self.redoTimer = StoreTimer(self.stack.store, 1909 duration=self.redoTimeoutMin) 1910 1911 self.oreo = None #keep locally generated oreo around for redos 1912 self.prep() # prepare .txData 1913 1914 def transmit(self, packet): 1915 ''' 1916 Augment transmit with restart of redo timer 1917 ''' 1918 super(Allowent, self).transmit(packet) 1919 self.redoTimer.restart() 1920 1921 def receive(self, packet): 1922 """ 1923 Process received packet belonging to this transaction 1924 """ 1925 super(Allowent, self).receive(packet) # self.rxPacket = packet 1926 1927 if packet.data['tk'] == TrnsKind.allow: 1928 if packet.data['pk'] == PcktKind.hello: 1929 self.hello() 1930 elif packet.data['pk'] == PcktKind.initiate: 1931 self.initiate() 1932 elif packet.data['pk'] == PcktKind.ack: 1933 self.final() 1934 elif packet.data['pk'] == PcktKind.nack: # rejected 1935 self.refuse() 1936 elif packet.data['pk'] == PcktKind.refuse: # refused 1937 self.refuse() 1938 elif packet.data['pk'] == PcktKind.reject: # rejected 1939 self.reject() 1940 1941 def process(self): 1942 ''' 1943 Perform time based processing of transaction 1944 1945 ''' 1946 if self.timeout > 0.0 and self.timer.expired: 1947 self.nack(kind=PcktKind.refuse.value) 1948 console.concise("Allowent {0}. Timed out with {1} in {2} at {3}\n".format( 1949 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1950 return 1951 1952 # need to perform the check for accepted status and then send accept 1953 if self.redoTimer.expired: 1954 duration = min( 1955 max(self.redoTimeoutMin, 1956 self.redoTimer.duration * 2.0), 1957 self.redoTimeoutMax) 1958 self.redoTimer.restart(duration=duration) 1959 1960 if self.txPacket: 1961 if self.txPacket.data['pk'] == PcktKind.cookie: 1962 self.transmit(self.txPacket) #redo 1963 console.concise("Allowent {0}. Redo Cookie with {1} in {2} at {3}\n".format( 1964 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1965 self.stack.incStat('redo_cookie') 1966 1967 if self.txPacket.data['pk'] == PcktKind.ack: 1968 self.transmit(self.txPacket) #redo 1969 console.concise("Allowent {0}. Redo Ack with {1} in {2} at {3}\n".format( 1970 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 1971 self.stack.incStat('redo_allow') 1972 1973 def prep(self): 1974 ''' 1975 Prepare .txData 1976 ''' 1977 self.txData.update( #sh=self.stack.local.ha[0], 1978 #sp=self.stack.local.ha[1], 1979 dh=self.remote.ha[0], # maybe needed for index 1980 dp=self.remote.ha[1], # maybe needed for index 1981 se=self.remote.nuid, 1982 de=self.remote.fuid, 1983 tk=self.kind, 1984 cf=self.rmt, 1985 bf=self.bcst, 1986 si=self.sid, 1987 ti=self.tid, ) 1988 1989 def hello(self): 1990 ''' 1991 Process hello packet 1992 ''' 1993 if not self.stack.parseInner(self.rxPacket): 1994 return 1995 1996 joins = self.remote.joinInProcess() 1997 if joins: 1998 emsg = ("Allowent {0}. Attempt to allow while join already in process with {1}. " 1999 "Aborting...\n".format(self.stack.name, self.remote.name)) 2000 console.concise(emsg) 2001 self.stack.incStat('invalid_allow_attempt') 2002 self.nack(kind=PcktKind.refuse.value) 2003 2004 allows = self.remote.allowInProcess() 2005 for allow in allows: 2006 if allow is self: 2007 emsg = ("Allowent {0}. Duplicate allow hello from {1}. " 2008 "Dropping...\n".format(self.stack.name, self.remote.name)) 2009 console.concise(emsg) 2010 self.stack.incStat('duplicate_allow_attempt') 2011 return 2012 2013 if allow.rmt: # is already a correspondent to an allow 2014 emsg = ("Allowent {0}. Another allowent already in process with {1}. " 2015 "Aborting...\n".format(self.stack.name, self.remote.name)) 2016 console.concise(emsg) 2017 self.stack.incStat('redundant_allow_attempt') 2018 self.nack(kind=PcktKind.refuse.value) 2019 return 2020 2021 else: # already initiator allow in process, resolve race condition 2022 if self.stack.local.name < self.remote.name: # abort correspondent 2023 emsg = ("Allowent {0}. Already initiated allow with {1}. " 2024 "Aborting because lesser local name...\n".format( 2025 self.stack.name, self.remote.name)) 2026 console.concise(emsg) 2027 self.stack.incStat('redundant_allow_attempt') 2028 self.nack(kind=PcktKind.refuse.value) 2029 return 2030 2031 else: # abort initiator, could let otherside nack do this 2032 emsg = ("Allowent {0}. Removing initiator allow with {1}. " 2033 "Proceeding because lesser local name...\n".format( 2034 self.stack.name, self.remote.name)) 2035 console.concise(emsg) 2036 allow.nack(kind=PcktKind.refuse.value) 2037 2038 self.remote.allowed = None 2039 2040 if not self.remote.joined: 2041 emsg = "Allowent {0}. Must be joined with {1} first\n".format( 2042 self.stack.name, self.remote.name) 2043 console.terse(emsg) 2044 self.stack.incStat('unjoined_allow_attempt') 2045 self.nack(kind=PcktKind.unjoined.value) 2046 return 2047 2048 self.remote.rekey() # refresh short term keys and .allowed 2049 self.add() 2050 2051 data = self.rxPacket.data 2052 body = self.rxPacket.body.data 2053 2054 if not isinstance(body, bytes): 2055 emsg = "Invalid format of hello packet body\n" 2056 console.terse(emsg) 2057 self.stack.incStat('invalid_hello') 2058 #self.remove() 2059 self.nack(kind=PcktKind.reject.value) 2060 return 2061 2062 if len(body) != raeting.HELLO_PACKER.size: 2063 emsg = "Invalid length of hello packet body\n" 2064 console.terse(emsg) 2065 self.stack.incStat('invalid_hello') 2066 #self.remove() 2067 self.nack(kind=PcktKind.reject.value) 2068 return 2069 2070 plain, shortraw, cipher, nonce = raeting.HELLO_PACKER.unpack(body) 2071 2072 self.remote.publee = nacling.Publican(key=shortraw) 2073 msg = self.stack.local.priver.decrypt(cipher, nonce, self.remote.publee.key) 2074 if msg != plain : 2075 emsg = "Invalid plain not match decrypted cipher\n" 2076 console.terse(emsg) 2077 self.stack.incStat('invalid_hello') 2078 #self.remove() 2079 self.nack(kind=PcktKind.reject.value) 2080 return 2081 2082 self.cookie() 2083 2084 def cookie(self): 2085 ''' 2086 Send Cookie Packet 2087 ''' 2088 oreo = self.stack.local.priver.nonce() 2089 self.oreo = binascii.hexlify(oreo) 2090 2091 stuff = raeting.COOKIESTUFF_PACKER.pack(self.remote.privee.pubraw, 2092 self.remote.nuid, 2093 self.remote.fuid, 2094 oreo) 2095 2096 cipher, nonce = self.stack.local.priver.encrypt(stuff, self.remote.publee.key) 2097 body = raeting.COOKIE_PACKER.pack(cipher, nonce) 2098 packet = packeting.TxPacket(stack=self.stack, 2099 kind=PcktKind.cookie.value, 2100 embody=body, 2101 data=self.txData) 2102 try: 2103 packet.pack() 2104 except raeting.PacketError as ex: 2105 console.terse(str(ex) + '\n') 2106 self.stack.incStat("packing_error") 2107 self.remove() 2108 return 2109 self.transmit(packet) 2110 console.concise("Allowent {0}. Do Cookie with {1} in {2} at {3}\n".format( 2111 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2112 2113 def initiate(self): 2114 ''' 2115 Process initiate packet 2116 ''' 2117 if not self.stack.parseInner(self.rxPacket): 2118 return 2119 data = self.rxPacket.data 2120 body = self.rxPacket.body.data 2121 2122 if not isinstance(body, bytes): 2123 emsg = "Invalid format of initiate packet body\n" 2124 console.terse(emsg) 2125 self.stack.incStat('invalid_initiate') 2126 #self.remove() 2127 self.nack(kind=PcktKind.reject.value) 2128 return 2129 2130 if len(body) != raeting.INITIATE_PACKER.size: 2131 emsg = "Invalid length of initiate packet body\n" 2132 console.terse(emsg) 2133 self.stack.incStat('invalid_initiate') 2134 #self.remove() 2135 self.nack(kind=PcktKind.reject.value) 2136 return 2137 2138 shortraw, oreo, cipher, nonce = raeting.INITIATE_PACKER.unpack(body) 2139 2140 if shortraw != self.remote.publee.keyraw: 2141 emsg = "Mismatch of short term public key in initiate packet\n" 2142 console.terse(emsg) 2143 self.stack.incStat('invalid_initiate') 2144 #self.remove() 2145 self.nack(kind=PcktKind.reject.value) 2146 return 2147 2148 if (binascii.hexlify(oreo) != self.oreo): 2149 emsg = "Stale or invalid cookie in initiate packet\n" 2150 console.terse(emsg) 2151 self.stack.incStat('invalid_initiate') 2152 #self.remove() 2153 self.nack(kind=PcktKind.reject.value) 2154 return 2155 2156 msg = self.remote.privee.decrypt(cipher, nonce, self.remote.publee.key) 2157 if len(msg) != raeting.INITIATESTUFF_PACKER.size: 2158 emsg = "Invalid length of initiate stuff\n" 2159 console.terse(emsg) 2160 self.stack.incStat('invalid_initiate') 2161 #self.remove() 2162 self.nack(kind=PcktKind.reject.value) 2163 return 2164 2165 pubraw, vcipher, vnonce, fqdn = raeting.INITIATESTUFF_PACKER.unpack(msg) 2166 if pubraw != self.remote.pubber.keyraw: 2167 emsg = "Mismatch of long term public key in initiate stuff\n" 2168 console.terse(emsg) 2169 self.stack.incStat('invalid_initiate') 2170 #self.remove() 2171 self.nack(kind=PcktKind.reject.value) 2172 return 2173 2174 fqdn = fqdn.rstrip(b' ') 2175 lfqdn = self.stack.local.fqdn 2176 if isinstance(lfqdn, unicode): 2177 lfqdn = lfqdn.encode('ascii', 'ignore') 2178 lfqdn = lfqdn.ljust(128, b' ')[:128].rstrip(b' ') 2179 if fqdn != lfqdn: 2180 emsg = ("Mismatch of local fqdn {0} with rxed fqdn {1} in initiate " 2181 "stuff\n".format(lfqdn, fqdn)) 2182 console.terse(emsg) 2183 #self.stack.incStat('invalid_initiate') 2184 #self.remove() 2185 #self.nack(kind=raeting.pcktKinds.reject) 2186 #return 2187 2188 vouch = self.stack.local.priver.decrypt(vcipher, vnonce, self.remote.pubber.key) 2189 if vouch != self.remote.publee.keyraw or vouch != shortraw: 2190 emsg = "Short term key vouch failed\n" 2191 console.terse(emsg) 2192 self.stack.incStat('invalid_initiate') 2193 #self.remove() 2194 self.nack(kind=PcktKind.reject.value) 2195 return 2196 2197 self.ackInitiate() 2198 2199 def ackInitiate(self): 2200 ''' 2201 Send ack to initiate request 2202 ''' 2203 2204 body = b'' 2205 packet = packeting.TxPacket(stack=self.stack, 2206 kind=PcktKind.ack.value, 2207 embody=body, 2208 data=self.txData) 2209 try: 2210 packet.pack() 2211 except raeting.PacketError as ex: 2212 console.terse(str(ex) + '\n') 2213 self.stack.incStat("packing_error") 2214 self.remove() 2215 return 2216 2217 self.transmit(packet) 2218 console.concise("Allowent {0}. Do Ack Initiate with {1} in {2} at {3}\n".format( 2219 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2220 2221 self.allow() 2222 2223 def allow(self): 2224 ''' 2225 Perform allowment 2226 ''' 2227 self.remote.allowed = True 2228 self.remote.alived = True # Fast alived as soon as allowed 2229 self.remote.nextSid() # start new session always on successful allow 2230 self.remote.replaceStaleInitiators() 2231 self.stack.dumpRemote(self.remote) 2232 2233 def final(self): 2234 ''' 2235 Process ackFinal packet 2236 So that both sides are waiting on acks at the end so does not restart 2237 transaction if ack initiate is dropped 2238 ''' 2239 if not self.stack.parseInner(self.rxPacket): 2240 return 2241 2242 self.remove() 2243 console.concise("Allowent {0}. Done with {1} in {2} at {3}\n".format( 2244 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2245 self.stack.incStat("allow_correspond_complete") 2246 self.remote.sendSavedMessages() # could include messages saved on rejoin 2247 2248 def refuse(self): 2249 ''' 2250 Process nack refuse packet 2251 ''' 2252 if not self.stack.parseInner(self.rxPacket): 2253 return 2254 2255 self.remove() 2256 console.concise("Allowent {0}. Refused by {1} in {2} at {3}n".format( 2257 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2258 self.stack.incStat(self.statKey()) 2259 2260 def reject(self): 2261 ''' 2262 Process nack packet 2263 terminate in response to nack 2264 ''' 2265 if not self.stack.parseInner(self.rxPacket): 2266 return 2267 2268 self.remote.allowed = False 2269 self.remove() 2270 console.concise("Allowent {0}. Rejected by {1} in {2} at {3}\n".format( 2271 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2272 self.stack.incStat(self.statKey()) 2273 2274 def nack(self, kind=PcktKind.nack.value): 2275 ''' 2276 Send nack to terminate allow transaction 2277 ''' 2278 body = b'' 2279 packet = packeting.TxPacket(stack=self.stack, 2280 kind=kind, 2281 embody=body, 2282 data=self.txData) 2283 try: 2284 packet.pack() 2285 except raeting.PacketError as ex: 2286 console.terse(str(ex) + '\n') 2287 self.stack.incStat("packing_error") 2288 self.remove() 2289 return 2290 2291 if kind==PcktKind.refuse: 2292 console.terse("Allowent {0}. Do Nack Refuse of {1} in {2} at {3}\n".format( 2293 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2294 elif kind==PcktKind.reject: 2295 console.concise("Allowent {0}. Do Nack Reject {1} in {2} at {3}\n".format( 2296 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2297 elif kind==PcktKind.unjoined: 2298 console.concise("Allowent {0}. Do Nack Unjoined {1} in {2} at {3}\n".format( 2299 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2300 elif kind == PcktKind.nack: 2301 console.terse("Allowent {0}. Do Nack of {1} in {2} at {3}\n".format( 2302 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2303 else: 2304 console.terse("Allowent {0}. Invalid nack kind {1}. Do Nack of {2} anyway " 2305 " in {3} at {4}\n".format(self.stack.name, 2306 kind, 2307 self.remote.name, 2308 self.tid, 2309 self.stack.store.stamp)) 2310 kind == PcktKind.nack 2311 2312 self.remove() 2313 self.transmit(packet) 2314 self.stack.incStat(self.statKey()) 2315 2316class Aliver(Initiator): 2317 ''' 2318 RAET protocol Aliver Initiator class Dual of Alivent 2319 Sends keep alive heatbeat messages to detect presence 2320 2321 2322 update alived status of .remote 2323 only use .remote.refresh to update 2324 2325 ''' 2326 Timeout = 2.0 2327 RedoTimeoutMin = 0.25 # initial timeout 2328 RedoTimeoutMax = 1.0 # max timeout 2329 2330 def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, 2331 cascade=False, **kwa): 2332 ''' 2333 Setup instance 2334 ''' 2335 kwa['kind'] = TrnsKind.alive.value 2336 super(Aliver, self).__init__(**kwa) 2337 2338 self.cascade = cascade 2339 2340 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 2341 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 2342 self.redoTimer = StoreTimer(self.stack.store, 2343 duration=self.redoTimeoutMin) 2344 2345 self.sid = self.remote.sid 2346 self.tid = self.remote.nextTid() 2347 self.prep() # prepare .txData 2348 2349 def transmit(self, packet): 2350 ''' 2351 Augment transmit with restart of redo timer 2352 ''' 2353 super(Aliver, self).transmit(packet) 2354 self.redoTimer.restart() 2355 2356 def receive(self, packet): 2357 """ 2358 Process received packet belonging to this transaction 2359 """ 2360 super(Aliver, self).receive(packet) 2361 2362 if packet.data['tk'] == TrnsKind.alive: 2363 if packet.data['pk'] == PcktKind.ack: 2364 self.complete() 2365 elif packet.data['pk'] == PcktKind.nack: # refused 2366 self.refuse() 2367 elif packet.data['pk'] == PcktKind.refuse: # refused 2368 self.refuse() 2369 elif packet.data['pk'] == PcktKind.unjoined: # unjoin 2370 self.unjoin() 2371 elif packet.data['pk'] == PcktKind.unallowed: # unallow 2372 self.unallow() 2373 elif packet.data['pk'] == PcktKind.reject: # rejected 2374 self.reject() 2375 2376 def process(self): 2377 ''' 2378 Perform time based processing of transaction 2379 ''' 2380 if self.timeout > 0.0 and self.timer.expired: 2381 console.concise("Aliver {0}. Timed out with {1} in {2} at {3}\n".format( 2382 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2383 self.remove() 2384 self.remote.refresh(alived=False) # mark as dead 2385 return 2386 2387 # need keep sending message until completed or timed out 2388 if self.redoTimer.expired: 2389 duration = min( 2390 max(self.redoTimeoutMin, 2391 self.redoTimer.duration * 2.0), 2392 self.redoTimeoutMax) 2393 self.redoTimer.restart(duration=duration) 2394 if self.txPacket: 2395 if self.txPacket.data['pk'] == PcktKind.request: 2396 self.transmit(self.txPacket) # redo 2397 console.concise("Aliver {0}. Redo with {1} in {2} at {3}\n".format( 2398 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2399 self.stack.incStat('redo_alive') 2400 2401 def prep(self): 2402 ''' 2403 Prepare .txData 2404 ''' 2405 self.txData.update( #sh=self.stack.local.ha[0], 2406 #sp=self.stack.local.ha[1], 2407 dh=self.remote.ha[0], # maybe needed for index 2408 dp=self.remote.ha[1], # maybe needed for index 2409 se=self.remote.nuid, 2410 de=self.remote.fuid, 2411 tk=self.kind, 2412 cf=self.rmt, 2413 bf=self.bcst, 2414 si=self.sid, 2415 ti=self.tid,) 2416 2417 def alive(self, body=None): 2418 ''' 2419 Send message 2420 ''' 2421 if not self.remote.joined: 2422 emsg = "Aliver {0}. Must be joined with {1} first\n".format( 2423 self.stack.name, self.remote.name) 2424 console.terse(emsg) 2425 self.stack.incStat('unjoined_remote') 2426 self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 2427 return 2428 2429 if not self.remote.allowed: 2430 emsg = "Aliver {0}. Must be allowed with {1} first\n".format( 2431 self.stack.name, self.remote.name) 2432 console.terse(emsg) 2433 self.stack.incStat('unallowed_remote') 2434 self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 2435 return 2436 2437 self.remote.refresh(alived=None) #Restart timer but do not change alived status 2438 self.add() 2439 2440 body = odict() 2441 packet = packeting.TxPacket(stack=self.stack, 2442 kind=PcktKind.request.value, 2443 embody=body, 2444 data=self.txData) 2445 try: 2446 packet.pack() 2447 except raeting.PacketError as ex: 2448 console.terse(str(ex) + '\n') 2449 self.stack.incStat("packing_error") 2450 self.remove() 2451 return 2452 self.transmit(packet) 2453 console.concise("Aliver {0}. Do Alive with {1} in {2} at {3}\n".format( 2454 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2455 2456 def complete(self): 2457 ''' 2458 Process ack packet. Complete transaction and remove 2459 ''' 2460 if not self.stack.parseInner(self.rxPacket): 2461 return 2462 self.remote.refresh(alived=True) # restart timer mark as alive 2463 self.remove() 2464 console.concise("Aliver {0}. Done with {1} in {2} at {3}\n".format( 2465 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2466 self.stack.incStat("alive_complete") 2467 2468 def refuse(self): 2469 ''' 2470 Process nack refuse packet 2471 terminate in response to nack 2472 ''' 2473 if not self.stack.parseInner(self.rxPacket): 2474 return 2475 self.remote.refresh(alived=None) # restart timer do not change status 2476 self.remove() 2477 console.concise("Aliver {0}. Refused by {1} in {2} at {3}\n".format( 2478 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2479 self.stack.incStat(self.statKey()) 2480 2481 def reject(self): 2482 ''' 2483 Process nack reject packet 2484 terminate in response to nack 2485 ''' 2486 if not self.stack.parseInner(self.rxPacket): 2487 return 2488 self.remote.refresh(alived=False) # restart timer set status to False 2489 self.remove() 2490 console.concise("Aliver {0}. Rejected by {1} in {2} at {3}\n".format( 2491 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2492 self.stack.incStat(self.statKey()) 2493 2494 def unjoin(self): 2495 ''' 2496 Process unjoin packet 2497 terminate in response to unjoin 2498 ''' 2499 if not self.stack.parseInner(self.rxPacket): 2500 return 2501 self.remote.refresh(alived=None) # restart timer do not change status 2502 self.remote.joined = False 2503 self.remove() 2504 console.concise("Aliver {0}. Refused unjoin by {1} in {2} at {3}\n".format( 2505 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2506 self.stack.incStat(self.statKey()) 2507 self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 2508 2509 def unallow(self): 2510 ''' 2511 Process unallow nack packet 2512 terminate in response to unallow 2513 ''' 2514 if not self.stack.parseInner(self.rxPacket): 2515 return 2516 self.remote.refresh(alived=None) # restart timer do not change status 2517 self.remote.allowed = False 2518 self.remove() 2519 console.concise("Aliver {0}. Refused unallow by {1} in {2} at {3}\n".format( 2520 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2521 self.stack.incStat(self.statKey()) 2522 self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout) 2523 2524class Alivent(Correspondent): 2525 ''' 2526 RAET protocol Alivent Correspondent class Dual of Aliver 2527 Keep alive heartbeat 2528 ''' 2529 Timeout = 10.0 2530 2531 def __init__(self, **kwa): 2532 ''' 2533 Setup instance 2534 ''' 2535 kwa['kind'] = TrnsKind.alive.value 2536 super(Alivent, self).__init__(**kwa) 2537 2538 self.prep() # prepare .txData 2539 2540 def receive(self, packet): 2541 """ 2542 Process received packet belonging to this transaction 2543 """ 2544 super(Alivent, self).receive(packet) 2545 2546 if packet.data['tk'] == TrnsKind.alive: 2547 if packet.data['pk'] == PcktKind.request: 2548 self.alive() 2549 2550 def process(self): 2551 ''' 2552 Perform time based processing of transaction 2553 2554 ''' 2555 if self.timeout > 0.0 and self.timer.expired: 2556 self.nack() #manage restarts alive later 2557 console.concise("Alivent {0}. Timed out with {1} in {2} at {3}\n".format( 2558 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2559 return 2560 2561 def prep(self): 2562 ''' 2563 Prepare .txData 2564 ''' 2565 self.txData.update( #sh=self.stack.local.ha[0], 2566 #sp=self.stack.local.ha[1], 2567 dh=self.remote.ha[0], # maybe needed for index 2568 dp=self.remote.ha[1], # maybe needed for index 2569 se=self.remote.nuid, 2570 de=self.remote.fuid, 2571 tk=self.kind, 2572 cf=self.rmt, 2573 bf=self.bcst, 2574 si=self.sid, 2575 ti=self.tid,) 2576 2577 def alive(self): 2578 ''' 2579 Process alive packet 2580 ''' 2581 if not self.stack.parseInner(self.rxPacket): 2582 return 2583 2584 if not self.remote.joined: 2585 self.remote.refresh(alived=None) # received signed packet so its alive 2586 emsg = "Alivent {0}. Must be joined with {1} first\n".format( 2587 self.stack.name, self.remote.name) 2588 console.terse(emsg) 2589 self.stack.incStat('unjoined_alive_attempt') 2590 self.nack(kind=PcktKind.unjoined.value) 2591 return 2592 2593 if not self.remote.allowed: 2594 self.remote.refresh(alived=None) # received signed packet so its alive 2595 emsg = "Alivent {0}. Must be allowed with {1} first\n".format( 2596 self.stack.name, self.remote.name) 2597 console.terse(emsg) 2598 self.stack.incStat('unallowed_alive_attempt') 2599 self.nack(kind=PcktKind.unallowed.value) 2600 return 2601 2602 self.add() 2603 2604 data = self.rxPacket.data 2605 body = self.rxPacket.body.data 2606 2607 body = odict() 2608 packet = packeting.TxPacket(stack=self.stack, 2609 kind=PcktKind.ack.value, 2610 embody=body, 2611 data=self.txData) 2612 try: 2613 packet.pack() 2614 except raeting.PacketError as ex: 2615 console.terse(str(ex) + '\n') 2616 self.stack.incStat("packing_error") 2617 self.remove() 2618 return 2619 2620 self.transmit(packet) 2621 console.concise("Alivent {0}. Do ack alive with {1} in {2} at {3}\n".format( 2622 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2623 self.remote.refresh(alived=True) 2624 self.remove() 2625 console.concise("Alivent {0}. Done with {1} in {2} at {3}\n".format( 2626 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2627 self.stack.incStat("alive_complete") 2628 2629 def nack(self, kind=PcktKind.nack.value): 2630 ''' 2631 Send nack to terminate alive transaction 2632 ''' 2633 body = odict() 2634 packet = packeting.TxPacket(stack=self.stack, 2635 kind=kind, 2636 embody=body, 2637 data=self.txData) 2638 try: 2639 packet.pack() 2640 except raeting.PacketError as ex: 2641 console.terse(str(ex) + '\n') 2642 self.stack.incStat("packing_error") 2643 self.remove() 2644 return 2645 2646 if kind == PcktKind.refuse: 2647 console.terse("Alivent {0}. Do Refuse of {1} in {2} at {3}\n".format( 2648 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2649 elif kind == PcktKind.unjoined: 2650 console.terse("Alivent {0}. Do Unjoined of {1} in {2} at {3}\n".format( 2651 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2652 elif kind == PcktKind.unallowed: 2653 console.terse("Alivent {0}. Do Unallowed of {1} in {2} at {3}\n".format( 2654 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2655 elif kind == PcktKind.reject: 2656 console.concise("Alivent {0}. Do Reject {1} in {2} at {3}\n".format( 2657 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2658 elif kind == PcktKind.nack: 2659 console.terse("Alivent {0}. Do Nack of {1} in {2} at {3}\n".format( 2660 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2661 else: 2662 console.terse("Alivent {0}. Invalid nack kind {1}. Do Nack of {2} anyway " 2663 " in {3} at {4}\n".format(self.stack.name, 2664 kind, 2665 self.remote.name, 2666 self.tid, 2667 self.stack.store.stamp)) 2668 kind == PcktKind.nack 2669 2670 self.transmit(packet) 2671 self.remove() 2672 2673 self.stack.incStat(self.statKey()) 2674 2675class Messenger(Initiator): 2676 ''' 2677 RAET protocol Messenger Initiator class Dual of Messengent 2678 Generic messages 2679 ''' 2680 Timeout = 0.0 2681 RedoTimeoutMin = 0.2 # initial timeout 2682 RedoTimeoutMax = 0.5 # max timeout 2683 2684 def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, burst=0, **kwa): 2685 ''' 2686 Setup instance 2687 ''' 2688 kwa['kind'] = TrnsKind.message.value 2689 super(Messenger, self).__init__(**kwa) 2690 2691 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 2692 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 2693 self.redoTimer = StoreTimer(self.stack.store, 2694 duration=self.redoTimeoutMin) 2695 2696 self.burst = max(0, int(burst)) # BurstSize 2697 self.misseds = oset() # ordered set of currently missed segments 2698 self.acked = False # Have received at least one ack 2699 2700 self.sid = self.remote.sid 2701 self.tid = self.remote.nextTid() 2702 self.prep() # prepare .txData 2703 self.tray = packeting.TxTray(stack=self.stack) 2704 2705 def transmit(self, packet): 2706 ''' 2707 Augment transmit with restart of redo timer 2708 ''' 2709 super(Messenger, self).transmit(packet) 2710 self.redoTimer.restart() 2711 2712 def receive(self, packet): 2713 """ 2714 Process received packet belonging to this transaction 2715 """ 2716 super(Messenger, self).receive(packet) 2717 2718 if packet.data['tk'] == TrnsKind.message: 2719 if packet.data['pk'] == PcktKind.ack: # more 2720 self.acked = True 2721 self.another() # continue message 2722 elif packet.data['pk'] == PcktKind.resend: # resend 2723 self.acked = True 2724 self.resend() # resend missed segments 2725 elif packet.data['pk'] == PcktKind.done: # completed 2726 self.acked = True 2727 self.complete() 2728 elif packet.data['pk'] == PcktKind.nack: # rejected 2729 self.reject() 2730 2731 def process(self): 2732 ''' 2733 Perform time based processing of transaction 2734 ''' 2735 if self.timeout > 0.0 and self.timer.expired: 2736 self.remove() 2737 console.concise("Messenger {0}. Timed out with {1} in {2} at {3}\n".format( 2738 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2739 return 2740 2741 # keep sending message until completed or timed out 2742 if self.redoTimer.expired: 2743 duration = min( 2744 max(self.redoTimeoutMin, 2745 self.redoTimer.duration * 2.0), 2746 self.redoTimeoutMax) 2747 self.redoTimer.restart(duration=duration) 2748 if self.txPacket: 2749 if self.txPacket.data['pk'] in [PcktKind.message]: 2750 if self.acked and not self.txPacket.data['af']: # turn on AgnFlag if not set 2751 self.txPacket.data.update(af=True) 2752 self.txPacket.repack() 2753 self.transmit(self.txPacket) # redo 2754 console.concise("Messenger {0}. Redo Segment {1} with " 2755 "{2} in {3} at {4}\n".format( 2756 self.stack.name, 2757 self.txPacket.data['sn'], 2758 self.remote.name, 2759 self.tid, 2760 self.stack.store.stamp)) 2761 self.stack.incStat('redo_segment') 2762 2763 def prep(self): 2764 ''' 2765 Prepare .txData 2766 ''' 2767 self.txData.update( #sh=self.stack.local.ha[0], 2768 #sp=self.stack.local.ha[1], 2769 dh=self.remote.ha[0], # maybe needed for index 2770 dp=self.remote.ha[1], # maybe needed for index 2771 se=self.remote.nuid, 2772 de=self.remote.fuid, 2773 tk=self.kind, 2774 cf=self.rmt, 2775 bf=self.bcst, 2776 si=self.sid, 2777 ti=self.tid,) 2778 2779 def message(self, body=None): 2780 ''' 2781 Send message or part of message. So repeatedly called until complete 2782 ''' 2783 2784 if not self.remote.allowed: 2785 emsg = "Messenger {0}. Must be allowed with {1} first\n".format( 2786 self.stack.name, self.remote.name) 2787 console.terse(emsg) 2788 self.stack.incStat('unallowed_remote') 2789 self.remove() 2790 return 2791 2792 if not self.tray.packets: 2793 try: 2794 self.tray.pack(data=self.txData, body=body) 2795 except raeting.PacketError as ex: 2796 console.terse(str(ex) + '\n') 2797 self.stack.incStat("packing_error") 2798 self.remove() 2799 return 2800 2801 if self.tray.current >= len(self.tray.packets): 2802 emsg = "Messenger {0}. Current packet {1} greater than num packets {2}\n".format( 2803 self.stack.name, self.tray.current, len(self.tray.packets)) 2804 console.terse(emsg) 2805 self.remove() 2806 return 2807 2808 if self.index not in self.remote.transactions: 2809 self.add() 2810 elif self.remote.transactions[self.index] != self: 2811 emsg = "Messenger {0}. Remote {1} Index collision of {2} in {3} at {4}\n".format( 2812 self.stack.name, 2813 self.remote.name, 2814 self.index, 2815 self.tid, 2816 self.stack.store.stamp) 2817 console.terse(emsg) 2818 self.incStat('message_index_collision') 2819 self.remove() 2820 return 2821 2822 burst = (min(self.burst, (len(self.tray.packets) - self.tray.current)) 2823 if self.burst else (len(self.tray.packets) - self.tray.current)) 2824 2825 packets = self.tray.packets[self.tray.current:self.tray.current + burst] 2826 if packets: 2827 last = packets[-1] 2828 last.data.update(wf=True) # set wait flag on last packet in burst 2829 last.repack() 2830 2831 for packet in packets: 2832 self.transmit(packet) 2833 self.tray.last = self.tray.current 2834 self.tray.current += 1 2835 self.stack.incStat("message_segment_tx") 2836 console.concise("Messenger {0}. Do Message Segment {1} with {2} in {3} at {4}\n".format( 2837 self.stack.name, self.tray.last, self.remote.name, self.tid, self.stack.store.stamp)) 2838 2839 def another(self): 2840 ''' 2841 Process ack packet and continue sending 2842 ''' 2843 if not self.stack.parseInner(self.rxPacket): 2844 return 2845 self.remote.refresh(alived=True) 2846 self.stack.incStat("message_ack_rx") 2847 2848 if self.misseds: 2849 self.sendMisseds() 2850 else: 2851 current = self.rxPacket.data['sn'] + 1 2852 if self.tray.current > current: 2853 console.concise("Messenger {0}. Current {1} is ahead of requested {2}. Adjust.\n".format( 2854 self.stack.name, self.tray.current, current)) 2855 self.tray.current = current 2856 self.tray.last = current - 1 2857 if self.tray.current < len(self.tray.packets): 2858 self.message() # continue message 2859 2860 def resend(self): 2861 ''' 2862 Process resend packet and update .misseds list of missing packets 2863 Then send misseds 2864 ''' 2865 if not self.stack.parseInner(self.rxPacket): 2866 return 2867 2868 self.remote.refresh(alived=True) 2869 self.stack.incStat('message_resend_rx') 2870 2871 data = self.rxPacket.data 2872 body = self.rxPacket.body.data 2873 2874 misseds = body.get('misseds') # indexes of missed segments 2875 if misseds: 2876 if not self.tray.packets: 2877 emsg = "Invalid resend request '{0}'\n".format(misseds) 2878 console.terse(emsg) 2879 self.stack.incStat('invalid_resend') 2880 return 2881 2882 for m in misseds: 2883 try: 2884 packet = self.tray.packets[m] 2885 except IndexError as ex: 2886 #console.terse(str(ex) + '\n') 2887 console.terse("Invalid misseds segment number {0}\n".format(m)) 2888 self.stack.incStat("invalid_misseds") 2889 return 2890 self.misseds.add(packet) # add segment, set only adds if unique 2891 self.sendMisseds() 2892 2893 def sendMisseds(self): 2894 ''' 2895 Send a burst of missed packets 2896 ''' 2897 if self.misseds: 2898 burst = (min(self.burst, (len(self.misseds))) if 2899 self.burst else len(self.misseds)) 2900 # make list of first burst number of packets 2901 misseds = [missed for missed in self.misseds][:burst] 2902 for packet in misseds[:-1]: 2903 repack = False 2904 if not packet.data['af']: # turn on again flag if not set 2905 packet.data.update(af=True) 2906 repack = True 2907 if packet.data['wf']: # turn off wait flag if set 2908 packet.data.update(wf=False) 2909 repack = True 2910 if repack: 2911 packet.repack() 2912 for packet in misseds[-1:]: # last packet 2913 repack = False 2914 if not packet.data['af']: # turn on again flag is not set 2915 packet.data.update(af=True) 2916 repack = True 2917 if not packet.data['wf']: # turn on wait flag if not set 2918 packet.data.update(wf=True) 2919 repack = True 2920 if repack: 2921 packet.repack() 2922 2923 for packet in misseds: 2924 self.transmit(packet) 2925 self.stack.incStat("message_segment_tx") 2926 console.concise("Messenger {0}. Do Resend Message Segment " 2927 "{1} with {2} in {3} at {4}\n".format( 2928 self.stack.name, 2929 packet.data['sn'], 2930 self.remote.name, 2931 self.tid, 2932 self.stack.store.stamp)) 2933 self.misseds.discard(packet) # remove from self.misseds 2934 2935 def complete(self): 2936 ''' 2937 Process Done Ack 2938 Complete transaction and remove 2939 ''' 2940 if not self.stack.parseInner(self.rxPacket): 2941 return 2942 2943 self.remote.refresh(alived=True) 2944 self.stack.incStat('message_complete_rx') 2945 2946 self.remove() 2947 console.concise("Messenger {0}. Done with {1} in {2} at {3}\n".format( 2948 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2949 self.stack.incStat("message_initiate_complete") 2950 2951 def reject(self): 2952 ''' 2953 Process nack packet 2954 terminate in response to nack 2955 ''' 2956 if not self.stack.parseInner(self.rxPacket): 2957 return 2958 2959 self.remote.refresh(alived=True) 2960 self.stack.incStat('message_reject_rx') 2961 2962 self.remove() 2963 console.concise("Messenger {0}. Rejected by {1} in {2} at {3}\n".format( 2964 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2965 self.stack.incStat(self.statKey()) 2966 2967 def nack(self): 2968 ''' 2969 Send nack to terminate transaction 2970 ''' 2971 body = odict() 2972 packet = packeting.TxPacket(stack=self.stack, 2973 kind=PcktKind.nack.value, 2974 embody=body, 2975 data=self.txData) 2976 try: 2977 packet.pack() 2978 except raeting.PacketError as ex: 2979 console.terse(str(ex) + '\n') 2980 self.stack.incStat("packing_error") 2981 self.remove() 2982 return 2983 2984 self.transmit(packet) 2985 self.stack.incStat('message_nack_tx') 2986 self.remove() 2987 console.concise("Messenger {0}. Do Nack Reject of {1} in {2} at {3}\n".format( 2988 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 2989 self.stack.incStat(self.statKey()) 2990 2991class Messengent(Correspondent): 2992 ''' 2993 RAET protocol Messengent Correspondent class Dual of Messenger 2994 Generic Messages 2995 ''' 2996 Timeout = 0.0 2997 RedoTimeoutMin = 0.2 # initial timeout 2998 RedoTimeoutMax = 0.5 # max timeout 2999 3000 def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, **kwa): 3001 ''' 3002 Setup instance 3003 ''' 3004 kwa['kind'] = TrnsKind.message.value 3005 super(Messengent, self).__init__(**kwa) 3006 3007 self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax 3008 self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin 3009 self.redoTimer = StoreTimer(self.stack.store, 3010 duration=self.redoTimeoutMin) 3011 3012 self.wait = False # wf wait flag 3013 self.lowest = None 3014 self.prep() # prepare .txData 3015 self.tray = packeting.RxTray(stack=self.stack) 3016 3017 def transmit(self, packet): 3018 ''' 3019 Augment transmit with restart of redo timer 3020 ''' 3021 super(Messengent, self).transmit(packet) 3022 self.redoTimer.restart() 3023 3024 def receive(self, packet): 3025 """ 3026 Process received packet belonging to this transaction 3027 """ 3028 super(Messengent, self).receive(packet) 3029 3030 # resent message 3031 if packet.data['tk'] == TrnsKind.message: 3032 if packet.data['pk'] == PcktKind.message: 3033 self.message() 3034 elif packet.data['pk'] == PcktKind.nack: # rejected 3035 self.reject() 3036 3037 def process(self): 3038 ''' 3039 Perform time based processing of transaction 3040 3041 ''' 3042 if self.timeout > 0.0 and self.timer.expired: 3043 self.nack() 3044 console.concise("Messengent {0}. Timed out with {1} in {2} at {3}\n".format( 3045 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 3046 return 3047 3048 if self.redoTimer.expired: 3049 duration = min( 3050 max(self.redoTimeoutMin, 3051 self.redoTimer.duration * 2.0), 3052 self.redoTimeoutMax) 3053 self.redoTimer.restart(duration=duration) 3054 3055 if self.tray.complete: 3056 self.complete() 3057 else: 3058 misseds = self.tray.missing(begin=self.lowest) 3059 if misseds: # resent missed segments 3060 self.lowest = misseds[0] 3061 self.resend(misseds) 3062 else: # always ask for more here 3063 self.ack() 3064 3065 def prep(self): 3066 ''' 3067 Prepare .txData 3068 ''' 3069 self.txData.update( #sh=self.stack.local.ha[0], 3070 #sp=self.stack.local.ha[1], 3071 dh=self.remote.ha[0], # maybe needed for index 3072 dp=self.remote.ha[1], # maybe needed for index 3073 se=self.remote.nuid, 3074 de=self.remote.fuid, 3075 tk=self.kind, 3076 cf=self.rmt, 3077 bf=self.bcst, 3078 wf=self.rxPacket.data['wf'], # was self.wait 3079 si=self.sid, 3080 ti=self.tid, 3081 ck=self.rxPacket.data['ck'], # so acks use same coat kind encrypted 3082 fk=self.rxPacket.data['fk'], # so acks use same foot kind signed 3083 ) 3084 3085 def message(self): 3086 ''' 3087 Process message packet. Called repeatedly for each packet in message 3088 ''' 3089 if not self.remote.allowed: 3090 emsg = "Messengent {0}. Must be allowed with {1} first\n".format( 3091 self.stack.name, self.remote.name) 3092 console.terse(emsg) 3093 self.stack.incStat('unallowed_message_attempt') 3094 self.nack() 3095 return 3096 3097 try: 3098 body = self.tray.parse(self.rxPacket) 3099 except raeting.PacketError as ex: 3100 console.terse(str(ex) + '\n') 3101 self.incStat('parsing_message_error') 3102 self.nack() 3103 return 3104 3105 if self.index not in self.remote.transactions: 3106 self.add() 3107 elif self.remote.transactions[self.index] != self: 3108 emsg = "Messengent {0}. Remote {1} Index collision of {2} in {3} at {4}\n".format( 3109 self.stack.name, 3110 self.remote.name, 3111 self.index, 3112 self.tid, 3113 self.stack.store.stamp) 3114 console.terse(emsg) 3115 self.incStat('message_index_collision') 3116 self.nack() 3117 return 3118 3119 self.remote.refresh(alived=True) 3120 self.stack.incStat("message_segment_rx") 3121 3122 self.wait = self.rxPacket.data['wf'] # sender is waiting for ack 3123 3124 if self.tray.complete: 3125 self.complete() 3126 elif self.wait: # ask for more if sender waiting for ack 3127 misseds = self.tray.missing(begin=self.lowest) 3128 if misseds: # resent missed segments 3129 self.lowest = misseds[0] 3130 self.resend(misseds) 3131 else: 3132 self.ack() 3133 3134 def ack(self): 3135 ''' 3136 Send ack to message 3137 ''' 3138 body = odict() 3139 packet = packeting.TxPacket(stack=self.stack, 3140 kind=PcktKind.ack.value, 3141 embody=body, 3142 data=self.txData) 3143 packet.data['sn'] = self.tray.highest 3144 try: 3145 packet.pack() 3146 except raeting.PacketError as ex: 3147 console.terse(str(ex) + '\n') 3148 self.stack.incStat("packing_error") 3149 self.remove() 3150 return 3151 self.transmit(packet) 3152 self.stack.incStat("message_more_ack") 3153 console.concise("Messengent {0}. Do Ack More from {1} on Segment {2} with {3} in {4} at {5}\n".format( 3154 self.stack.name, 3155 self.tray.highest + 1, 3156 self.rxPacket.data['sn'], 3157 self.remote.name, 3158 self.tid, 3159 self.stack.store.stamp)) 3160 3161 def resend(self, misseds): 3162 ''' 3163 Send resend request(s) for missing packets 3164 ''' 3165 while misseds: 3166 if len(misseds) > 64: 3167 remainders = misseds[64:] # only do at most 64 at a time 3168 misseds = misseds[:64] 3169 else: 3170 remainders = [] 3171 3172 body = odict(misseds=misseds) 3173 packet = packeting.TxPacket(stack=self.stack, 3174 kind=PcktKind.resend.value, 3175 embody=body, 3176 data=self.txData) 3177 try: 3178 packet.pack() 3179 except raeting.PacketError as ex: 3180 console.terse(str(ex) + '\n') 3181 self.stack.incStat("packing_error") 3182 self.remove() 3183 return 3184 self.transmit(packet) 3185 self.stack.incStat("message_resend_tx") 3186 console.concise("Messengent {0}. Do Resend Segments {1} with {2} in {3} at {4}\n".format( 3187 self.stack.name, 3188 misseds, 3189 self.remote.name, 3190 self.tid, 3191 self.stack.store.stamp)) 3192 misseds = remainders 3193 3194 def complete(self): 3195 ''' 3196 Complete transaction and remove 3197 ''' 3198 self.done() 3199 console.verbose("{0} received message body\n{1}\n".format( 3200 self.stack.name, self.tray.body)) 3201 # application layer authorizaiton needs to know who sent the message 3202 self.stack.rxMsgs.append((self.tray.body, self.remote.name)) 3203 self.remove() 3204 console.concise("Messengent {0}. Complete with {1} in {2} at {3}\n".format( 3205 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 3206 self.stack.incStat("messengent_correspond_complete") 3207 3208 def done(self): 3209 ''' 3210 Send done ack to complete message 3211 ''' 3212 body = odict() 3213 packet = packeting.TxPacket(stack=self.stack, 3214 kind=PcktKind.done.value, 3215 embody=body, 3216 data=self.txData) 3217 try: 3218 packet.pack() 3219 except raeting.PacketError as ex: 3220 console.terse(str(ex) + '\n') 3221 self.stack.incStat("packing_error") 3222 self.remove() 3223 return 3224 self.transmit(packet) 3225 self.stack.incStat("message_complete_ack") 3226 console.concise("Messengent {0}. Do Ack Done Message on Segment {1} with {2} in {3} at {4}\n".format( 3227 self.stack.name, 3228 self.rxPacket.data['sn'], 3229 self.remote.name, 3230 self.tid, 3231 self.stack.store.stamp)) 3232 3233 def reject(self): 3234 ''' 3235 Process nack packet 3236 terminate in response to nack 3237 ''' 3238 if not self.stack.parseInner(self.rxPacket): 3239 return 3240 3241 self.remote.refresh(alived=True) 3242 self.stack.incStat("message_reject_nack") 3243 3244 self.remove() 3245 console.concise("Messengent {0}. Rejected by {1} in {2} at {3}\n".format( 3246 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 3247 self.stack.incStat(self.statKey()) 3248 3249 def nack(self): 3250 ''' 3251 Send nack to terminate messenger transaction 3252 ''' 3253 body = odict() 3254 packet = packeting.TxPacket(stack=self.stack, 3255 kind=PcktKind.nack.value, 3256 embody=body, 3257 data=self.txData) 3258 try: 3259 packet.pack() 3260 except raeting.PacketError as ex: 3261 console.terse(str(ex) + '\n') 3262 self.stack.incStat("packing_error") 3263 self.remove() 3264 return 3265 3266 self.transmit(packet) 3267 self.remove() 3268 console.concise("Messagent {0}. Do Nack Reject of {1} in {2} at {3}\n".format( 3269 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp)) 3270 self.stack.incStat(self.statKey()) 3271 3272 def remove(self, remote=None, index=None): 3273 self.remote.addDoneTransaction(self.tid) 3274 super(Messengent, self).remove(remote, index) 3275