1# -*- coding: utf-8 -*-
2'''
3stacking.py raet protocol stacking classes
4'''
5# pylint: skip-file
6# pylint: disable=W0611
7
8# Import python libs
9import socket
10import binascii
11import struct
12
13try:
14    import simplejson as json
15except ImportError:
16    import json
17
18# Import ioflo libs
19from ioflo.aid.odicting import odict
20from ioflo.aid.osetting import oset
21from ioflo.aid.timing import StoreTimer
22from ioflo.aid.aiding import packByte, unpackByte
23
24# Import raet libs
25from ..abiding import *  # import globals
26from .. import raeting
27from ..raeting import Acceptance, PcktKind, TrnsKind, CoatKind, FootKind
28from .. import nacling
29from . import packeting
30from . import estating
31
32from ioflo.base.consoling import getConsole
33console = getConsole()
34
35class Transaction(object):
36    '''
37    RAET protocol transaction class
38    '''
39    Timeout =  5.0 # default timeout
40
41    def __init__(self, stack=None, remote=None, kind=None, timeout=None,
42                 rmt=False, bcst=False, sid=None, tid=None,
43                 txData=None, txPacket=None, rxPacket=None):
44        '''
45        Setup Transaction instance
46        timeout of 0.0 means no timeout go forever
47        '''
48        self.stack = stack
49        self.remote = remote
50        self.kind = kind or raeting.PACKET_DEFAULTS['tk']
51
52        if timeout is None:
53            timeout = self.Timeout
54        self.timeout = timeout
55        self.timer = StoreTimer(self.stack.store, duration=self.timeout)
56
57        self.rmt = rmt # remote initiator
58        self.bcst = bcst # bf flag
59
60        self.sid = sid
61        self.tid = tid
62
63        self.txData = txData or odict() # data used to prepare last txPacket
64        self.txPacket = txPacket  # last tx packet needed for retries
65        self.rxPacket = rxPacket  # last rx packet needed for index
66
67    @property
68    def index(self):
69        '''
70        Property is transaction tuple (rf, le, re, si, ti, bf,)
71        Not to be used in join (Joiner and Joinent) since bootstrapping
72        Use the txPacket (Joiner) or rxPacket (Joinent) .data instead
73        '''
74        le = self.remote.nuid
75        re = self.remote.fuid
76        return ((self.rmt, le, re, self.sid, self.tid, self.bcst,))
77
78    def process(self):
79        '''
80        Process time based handling of transaction like timeout or retries
81        '''
82        pass
83
84    def receive(self, packet):
85        '''
86        Process received packet Subclasses should super call this
87        '''
88        self.rxPacket = packet
89
90    def transmit(self, packet):
91        '''
92        Queue tx duple on stack transmit queue
93        '''
94        try:
95            self.stack.tx(packet.packed, self.remote.uid)
96        except raeting.StackError as ex:
97            console.terse(str(ex) + '\n')
98            self.stack.incStat(self.statKey())
99            self.remove(remote=self.remote, index=packet.index)
100            return
101        self.txPacket = packet
102
103    def add(self, remote=None, index=None):
104        '''
105        Add self to remote transactions
106        '''
107        if not index:
108            index = self.index
109        if not remote:
110            remote = self.remote
111        remote.addTransaction(index, self)
112
113    def remove(self, remote=None, index=None):
114        '''
115        Remove self from remote transactions
116        '''
117        if not index:
118            index = self.index
119        if not remote:
120            remote = self.remote
121        if remote:
122            remote.removeTransaction(index, transaction=self)
123
124    def statKey(self):
125        '''
126        Return the stat name key from class name
127        '''
128        return ("{0}_transaction_failure".format(self.__class__.__name__.lower()))
129
130    def nack(self, **kwa):
131        '''
132        Placeholder override in sub class
133        nack to terminate transaction with other side of transaction
134        '''
135        pass
136
137class Initiator(Transaction):
138    '''
139    RAET protocol initiator transaction class
140    '''
141    def __init__(self, **kwa):
142        '''
143        Setup Transaction instance
144        '''
145        kwa['rmt'] = False  # force rmt to False since local initator
146        super(Initiator, self).__init__(**kwa)
147
148    def process(self):
149        '''
150        Process time based handling of transaction like timeout or retries
151        '''
152        if self.timeout > 0.0 and self.timer.expired:
153            self.remove()
154
155class Correspondent(Transaction):
156    '''
157    RAET protocol correspondent transaction class
158    '''
159    Requireds = ['sid', 'tid', 'rxPacket']
160
161    def __init__(self, **kwa):
162        '''
163        Setup Transaction instance
164        '''
165        kwa['rmt'] = True  # force rmt to True since remote initiator
166
167        missing = []
168        for arg in self.Requireds:
169            if arg not in kwa:
170                missing.append(arg)
171        if missing:
172            emsg = "Missing required keyword arguments: '{0}'".format(missing)
173            raise TypeError(emsg)
174
175        super(Correspondent, self).__init__(**kwa)
176
177class Staler(Initiator):
178    '''
179    RAET protocol Staler initiator transaction class
180    '''
181    def __init__(self, **kwa):
182        '''
183        Setup Transaction instance
184        '''
185        for key in ['kind', 'sid', 'tid', 'rxPacket']:
186            if key not  in kwa:
187                emsg = "Missing required keyword arguments: '{0}'".format(key)
188                raise TypeError(emsg)
189        super(Staler, self).__init__(**kwa)
190
191        self.prep()
192
193    def prep(self):
194        '''
195        Prepare .txData for nack to stale
196        '''
197        self.txData.update(
198                            dh=self.rxPacket.data['sh'], # may need for index
199                            dp=self.rxPacket.data['sp'], # may need for index
200                            se=self.remote.nuid,
201                            de=self.rxPacket.data['se'],
202                            tk=self.kind,
203                            cf=self.rmt,
204                            bf=self.bcst,
205                            si=self.sid,
206                            ti=self.tid,
207                            ck=self.rxPacket.data['ck'], # CoatKind.nada.value,
208                            fk=self.rxPacket.data['fk'], # FootKind.nada.value
209                          )
210
211    def nack(self):
212        '''
213        Send nack to stale packet from correspondent.
214        This is used when a correspondent packet is received but no matching
215        Initiator transaction is found. So create a dummy initiator and send
216        a nack packet back. Do not add transaction so don't need to remove it.
217        '''
218        ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp'])
219        try:
220            tkname = TrnsKind(self.rxPacket.data['tk'])
221        except ValueError as ex:
222            tkname = None
223        try:
224            pkname = TrnsKind(self.rxPacket.data['pk'])
225        except ValueError as ex:
226            pkname = None
227
228        emsg = ("Staler '{0}'. Stale transaction '{1}' packet '{2}' from '{3}' in {4} "
229                "nacking...\n".format(self.stack.name, tkname, pkname, ha, self.tid))
230        console.terse(emsg)
231        self.stack.incStat('stale_correspondent_attempt')
232
233        if self.rxPacket.data['se'] not in self.stack.remotes:
234            emsg = "Staler '{0}'. Unknown correspondent estate id '{1}'\n".format(
235                    self.stack.name, self.rxPacket.data['se'])
236            console.terse(emsg)
237            self.stack.incStat('unknown_correspondent_uid')
238            #return #maybe we should return and not respond at all in this case
239
240        body = odict()
241        packet = packeting.TxPacket(stack=self.stack,
242                                    kind=PcktKind.nack.value,
243                                    embody=body,
244                                    data=self.txData)
245        try:
246            packet.pack()
247        except raeting.PacketError as ex:
248            console.terse(str(ex) + '\n')
249            self.stack.incStat("packing_error")
250            return
251
252        self.stack.txes.append((packet.packed, ha))
253        console.terse("Staler '{0}'. Do Nack of stale correspondent {1} in {2} at {3}\n".format(
254                self.stack.name, ha, self.tid, self.stack.store.stamp))
255        self.stack.incStat('stale_correspondent_nack')
256
257
258class Stalent(Correspondent):
259    '''
260    RAET protocol Stalent correspondent transaction class
261    '''
262    Requireds = ['kind', 'sid', 'tid', 'rxPacket']
263
264    def __init__(self, **kwa):
265        '''
266        Setup Transaction instance
267        '''
268        super(Stalent, self).__init__(**kwa)
269
270        self.prep()
271
272    def prep(self):
273        '''
274        Prepare .txData for nack to stale
275        '''
276        self.txData.update(
277                            dh=self.rxPacket.data['sh'], # may need for index
278                            dp=self.rxPacket.data['sp'], # may need for index
279                            se=self.rxPacket.data['de'],
280                            de=self.rxPacket.data['se'],
281                            tk=self.kind,
282                            cf=self.rmt,
283                            bf=self.bcst,
284                            si=self.sid,
285                            ti=self.tid,
286                            ck=self.rxPacket.data['ck'],  # CoatKind.nada.value
287                            fk=self.rxPacket.data['fk'],  # FootKind.nada.value
288                           )
289
290    def nack(self, kind=PcktKind.nack.value):
291        '''
292        Send nack to stale packet from initiator.
293        This is used when a initiator packet is received but with a stale session id
294        So create a dummy correspondent and send a nack packet back.
295        Do not add transaction so don't need to remove it.
296        '''
297        ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp'])
298        try:
299            tkname = TrnsKind(self.rxPacket.data['tk'])
300        except ValueError as ex:
301            tkname = None
302        try:
303            pkname = TrnsKind(self.rxPacket.data['pk'])
304        except ValueError as ex:
305            pkname = None
306
307        emsg = ("Stalent '{0}'. Stale transaction '{1}' packet '{2}' from '{3}' in {4} "
308                "nacking ...\n".format(self.stack.name, tkname, pkname, ha, self.tid))
309        console.terse(emsg)
310        self.stack.incStat('stale_initiator_attempt')
311
312        if self.rxPacket.data['se'] not in self.stack.remotes:
313            emsg = "Stalent '{0}'. Unknown initiator estate id '{1}'\n".format(
314                    self.stack.name,
315                    self.rxPacket.data['se'])
316            console.terse(emsg)
317            self.stack.incStat('unknown_initiator_uid')
318            #return #maybe we should return and not respond at all in this case
319
320        body = odict()
321        packet = packeting.TxPacket(stack=self.stack,
322                                    kind=kind,
323                                    embody=body,
324                                    data=self.txData)
325        try:
326            packet.pack()
327        except raeting.PacketError as ex:
328            console.terse(str(ex) + '\n')
329            self.stack.incStat("packing_error")
330            return
331
332        if kind == PcktKind.renew:
333            console.terse("Stalent '{0}'. Do Renew of {1} in {2} at {3}\n".format(
334                    self.stack.name, ha, self.tid, self.stack.store.stamp))
335        elif kind == PcktKind.refuse:
336            console.terse("Stalent '{0}'. Do Refuse of {1} in {2} at {3}\n".format(
337                    self.stack.name, ha, self.tid, self.stack.store.stamp))
338        elif kind == PcktKind.reject:
339            console.terse("Stalent '{0}'. Do Reject of {1} in {2} at {3}\n".format(
340                    self.stack.name, ha, self.tid, self.stack.store.stamp))
341        elif kind == PcktKind.nack:
342            console.terse("Stalent '{0}'. Do Nack of {1} in {2} at {3}\n".format(
343                    self.stack.name, ha, self.tid, self.stack.store.stamp))
344        else:
345            console.terse("Stalent '{0}'. Invalid nack kind {1}. Do Nack of {2} anyway "
346                    " to {3) at {4}\n".format(self.stack.name,
347                                       kind,
348                                       ha,
349                                       self.tid,
350                                       self.stack.store.stamp))
351            kind == PcktKind.nack
352
353        self.stack.txes.append((packet.packed, ha))
354        self.stack.incStat('stale_initiator_nack')
355
356class Joiner(Initiator):
357    '''
358    RAET protocol Joiner Initiator class Dual of Joinent
359
360    Joiner must always add new remote since always must anticipate response to
361    request.
362    '''
363    RedoTimeoutMin = 1.0 # initial timeout
364    RedoTimeoutMax = 4.0 # max timeout
365    PendRedoTimeout = 60.0 # Redo timeout when pended
366
367    def __init__(self,
368                 redoTimeoutMin=None,
369                 redoTimeoutMax=None,
370                 pendRedoTimeout=None,
371                 cascade=False,
372                 renewal=False,
373                 **kwa):
374        '''
375        Setup Transaction instance
376        '''
377        kwa['kind'] = TrnsKind.join.value
378        super(Joiner, self).__init__(**kwa)
379
380        self.cascade = cascade
381
382        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
383        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
384        self.redoTimer = StoreTimer(self.stack.store,
385                                           duration=self.redoTimeoutMin)
386        self.pendRedoTimeout = pendRedoTimeout or self.PendRedoTimeout
387
388        self.sid = 0 #always 0 for join
389        self.tid = self.remote.nextTid()
390        # fuid is assigned during join but want to preserve vacuousness for remove
391        self.vacuous = (self.remote.fuid == 0)
392        self.renewal = renewal # is current join a renew, vacuous rejoin
393        self.pended = False # Farside Correspondent has pended remote acceptance
394        self.prep()
395        # don't dump remote yet since its ephemeral until we join and get valid uid
396
397    def transmit(self, packet):
398        '''
399        Augment transmit with restart of redo timer
400        '''
401        super(Joiner, self).transmit(packet)
402        self.redoTimer.restart()
403
404    def add(self, remote=None, index=None):
405        '''
406        Augment with add self.remote to stack.joinees if vacuous
407        '''
408        super(Joiner, self).add(remote=remote, index=index)
409        # self.remote is now assigned
410        if self.vacuous: # vacuous
411            self.stack.joinees[self.remote.ha] = self.remote
412
413    def remove(self, remote=None, index=None):
414        '''
415        Remove self from stack transactions
416        '''
417        super(Joiner, self).remove(remote=remote, index=index)
418        # self.remote is now assigned
419        if self.vacuous: # vacuous
420            if self.remote.ha in self.stack.joinees and not self.remote.transactions:
421                del self.stack.joinees[self.remote.ha]
422
423    def receive(self, packet):
424        """
425        Process received packet belonging to this transaction
426        """
427        super(Joiner, self).receive(packet) #  self.rxPacket = packet
428
429        if packet.data['tk'] == TrnsKind.join:
430            if packet.data['pk'] == PcktKind.pend: # pending
431                self.stack.incStat('joiner_rx_pend')
432                self.pend()
433            elif packet.data['pk'] == PcktKind.response: # accepted
434                self.stack.incStat('joiner_rx_response')
435                self.accept()
436            elif packet.data['pk'] == PcktKind.nack: #stale
437                self.stack.incStat('joiner_rx_nack')
438                self.refuse()
439            elif packet.data['pk'] == PcktKind.refuse: #refused
440                self.stack.incStat('joiner_rx_refuse')
441                self.refuse()
442            elif packet.data['pk'] == PcktKind.renew: #renew
443                self.stack.incStat('joiner_rx_renew')
444                self.renew()
445            elif packet.data['pk'] == PcktKind.reject: #rejected
446                self.stack.incStat('joiner_rx_reject')
447                self.reject()
448
449    def process(self):
450        '''
451        Perform time based processing of transaction
452        '''
453        if self.timeout > 0.0 and self.timer.expired:
454            if self.txPacket and self.txPacket.data['pk'] == PcktKind.request:
455                self.remove(index=self.txPacket.index)
456            else:
457                self.remove(index=self.index) # in case never sent txPacket
458
459            console.concise("Joiner {0}. Timed out with {1} in {2} at {3}\n".format(
460                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
461
462            return
463
464        # need keep sending join until accepted or timed out
465        if self.redoTimer.expired:
466            if not self.pended:
467                duration = min(
468                         max(self.redoTimeoutMin,
469                              self.redoTimer.duration * 2.0),
470                         self.redoTimeoutMax)
471            else:
472                duration = self.pendRedoTimeout
473
474            self.redoTimer.restart(duration=duration)
475            if (self.txPacket and
476                    self.txPacket.data['pk'] == PcktKind.request):
477                self.transmit(self.txPacket) #redo
478                console.concise("Joiner {0}. Redo Join with {1} in {2} at {3}\n".format(
479                     self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
480                self.stack.incStat('joiner_tx_join_redo')
481            else: #check to see if status has changed to accept after other kind
482                if self.remote:
483                    status = self.stack.keep.statusRemote(self.remote, dump=True)
484                    if status == Acceptance.accepted:
485                        self.completify()
486                    elif status == Acceptance.rejected:
487                        "Joiner {0}: Estate '{1}' uid '{2}' keys rejected\n".format(
488                                self.stack.name, self.remote.name, self.remote.uid)
489                        self.stack.removeRemote(self.remote, clear=True)
490                        # removeRemote also nacks
491
492    def prep(self):
493        '''
494        Prepare .txData
495        '''
496        self.txData.update(
497                            dh=self.remote.ha[0], # may need for index
498                            dp=self.remote.ha[1], # may need for index
499                            se=self.remote.nuid,
500                            de=self.remote.fuid,
501                            tk=self.kind,
502                            cf=self.rmt,
503                            bf=self.bcst,
504                            si=self.sid,
505                            ti=self.tid,
506                            ck=CoatKind.nada.value,
507                            fk=FootKind.nada.value
508                          )
509
510    def join(self):
511        '''
512        Send join request
513        '''
514        joins = self.remote.joinInProcess()
515        if joins:
516            emsg = ("Joiner {0}. Join with {1} already in process. "
517                    "Aborting...\n".format(
518                                           self.stack.name,
519                                           self.remote.name))
520            console.concise(emsg)
521            return
522
523        self.remote.joined = None
524
525        if self.stack.kind is None:
526            self.stack.kind = 0
527        else:
528            if self.stack.kind < 0 or self.stack.kind > 255:
529                emsg = ("Joiner {0}. Invalid application kind field value {1} for {2}. "
530                                "Aborting...\n".format(
531                                                       self.stack.name,
532                                                       self.stack.kind,
533                                                       self.remote.name))
534                console.concise(emsg)
535                return
536
537        flags = [0, 0, 0, 0, 0, 0, 0, self.stack.main] # stack operation mode flags
538        operation = packByte(fmt=b'11111111', fields=flags)
539        body = odict([('name', self.stack.local.name),
540                      ('mode', operation),
541                      ('kind', self.stack.kind),
542                      ('verhex', str(self.stack.local.signer.verhex.decode('ISO-8859-1'))
543                               if self.stack.local.signer.verhex else None ),
544                      ('pubhex', str(self.stack.local.priver.pubhex.decode('ISO-8859-1'))
545                               if    self.stack.local.priver.pubhex else None),
546                      ('role', self.stack.local.role)])
547        packet = packeting.TxPacket(stack=self.stack,
548                                    kind=PcktKind.request.value,
549                                    embody=body,
550                                    data=self.txData)
551        try:
552            packet.pack()
553        except raeting.PacketError as ex:
554            console.terse(str(ex) + '\n')
555            self.stack.incStat("packing_error")
556            self.remove()
557            return
558        console.concise("Joiner {0}. Do Join with {1} in {2} at {3}\n".format(
559                        self.stack.name,
560                        self.remote.name,
561                        self.tid,
562                        self.stack.store.stamp))
563        self.transmit(packet)
564        self.add(index=self.txPacket.index)
565
566    def renew(self):
567        '''
568        Perform renew in response to nack renew
569        Reset to vacuous Road data and try joining again if not main
570        Otherwise act as if rejected
571        '''
572        # renew not allowed on immutable road
573        if not self.stack.mutable:
574            self.stack.incStat('join_renew_unallowed')
575            emsg = ("Joiner {0}. Renew from '{1}' not allowed on immutable"
576                    " road\n".format(self.stack.name, self.remote.name))
577            console.terse(emsg)
578            self.refuse()
579            return
580
581        console.terse("Joiner {0}. Renew from {1} in {2} at {3}\n".format(
582                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
583        self.stack.incStat('join_renew_attempt')
584        self.remove(index=self.txPacket.index)
585        if self.remote:
586            self.remote.fuid = 0 # forces vacuous join
587            self.stack.dumpRemote(self.remote) # since change fuid
588        self.stack.join(uid=self.remote.uid, timeout=self.timeout, renewal=True)
589
590    def pend(self):
591        '''
592        Process ack pend to join packet
593        '''
594        if not self.stack.parseInner(self.rxPacket):
595            return
596        self.pended = True
597
598    def accept(self):
599        '''
600        Perform acceptance in response to join response packet
601        '''
602        if not self.stack.parseInner(self.rxPacket):
603            return
604
605        data = self.rxPacket.data
606        body = self.rxPacket.body.data
607
608        name = body.get('name')
609        if not name:
610            emsg = "Missing remote name in accept packet\n"
611            console.terse(emsg)
612            self.stack.incStat('invalid_accept')
613            self.remove(index=self.txPacket.index)
614            return
615
616        mode = body.get('mode')
617        if mode is None or not isinstance(mode, int) or mode < 0 or mode > 255:
618            emsg = "Missing or invalid remote stack operation mode in accept packet\n"
619            console.terse(emsg)
620            self.stack.incStat('invalid_accept')
621            self.remove(index=self.txPacket.index)
622            return
623        flags = unpackByte(fmt=b'11111111', byte=mode, boolean=True)
624        main = flags[7]
625
626        kind = body.get('kind')
627        if kind is None:
628            emsg = "Missing or invalid remote application kind in accept packet\n"
629            console.terse(emsg)
630            self.stack.incStat('invalid_accept')
631            self.remove(index=self.txPacket.index)
632            return
633
634        fuid = body.get('uid')
635        if not fuid: # None or zero
636            emsg = "Missing or invalid remote farside uid in accept packet\n"
637            console.terse(emsg)
638            self.stack.incStat('invalid_accept')
639            self.remove(index=self.txPacket.index)
640            return
641
642        verhex = body.get('verhex', '')
643        if not verhex:
644            emsg = "Missing remote verifier key in accept packet\n"
645            console.terse(emsg)
646            self.stack.incStat('invalid_accept')
647            self.remove(index=self.txPacket.index)
648            return
649
650        pubhex = body.get('pubhex', '')
651        if not pubhex:
652            emsg = "Missing remote crypt key in accept packet\n"
653            console.terse(emsg)
654            self.stack.incStat('invalid_accept')
655            self.remove(index=self.txPacket.index)
656            return
657
658        role = body.get('role')
659        if not role:
660            emsg = "Missing remote role in accept packet\n"
661            console.terse(emsg)
662            self.stack.incStat('invalid_accept')
663            self.remove(index=self.txPacket.index)
664            return
665
666        rha = (data['sh'], data['sp'])
667        reid = data['se']
668        leid = data['de']
669
670        if self.vacuous:
671            self.remote.fuid = fuid
672            if not self.renewal: # ephemeral like
673                if name != self.remote.name:
674                    if name in self.stack.nameRemotes:
675                        emsg = ("Joiner {0}.  New name '{1}' unavailable for "
676                                "remote {2}\n".format(self.stack.name,
677                                                      name,
678                                                      self.remote.name))
679                        console.terse(emsg)
680                        self.nack(kind=PcktKind.reject.value)
681                        return
682                    try:
683                        self.stack.renameRemote(self.remote, new=name)
684                    except raeting.StackError as ex:
685                        console.terse(str(ex) + '\n')
686                        self.stack.incStat(self.statKey())
687                        self.remove(index=self.txPacket.index)
688                        return
689                self.remote.main = main
690                self.remote.kind = kind
691                self.remote.fuid = fuid
692                self.remote.role = role
693                self.remote.verfer = nacling.Verifier(verhex) # verify key manager
694                self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager
695
696        sameRoleKeys = (role == self.remote.role and
697                        ns2b(verhex) == self.remote.verfer.keyhex and
698                        ns2b(pubhex) == self.remote.pubber.keyhex)
699
700        sameAll = (sameRoleKeys and
701                   name == self.remote.name and
702                   rha == self.remote.ha and
703                   fuid == self.remote.fuid and
704                   main == self.remote.main and
705                   kind == self.remote.kind)
706
707        if not sameAll and not self.stack.mutable:
708            emsg = ("Joiner {0}. Attempt to change immutable road by "
709                                   "'{1}'\n".format(self.stack.name,
710                                                    self.remote.name))
711            console.terse(emsg)
712            self.nack(kind=PcktKind.reject.value) # reject not mutable road
713            self.remove(index=self.txPacket.index)
714            return
715
716        status = self.stack.keep.statusRole(role=role,
717                                                    verhex=verhex,
718                                                    pubhex=pubhex,
719                                                    dump=True)
720
721        if status == Acceptance.rejected:
722            if sameRoleKeys:
723                self.stack.removeRemote(self.remote, clear=True)
724                # remove also nacks so will also reject
725            else:
726                self.nack(kind=PcktKind.reject.value) # reject
727            return
728
729        # accepted or pending
730        self.remote.acceptance = status # change acceptance of remote
731
732        if not sameAll: # (and mutable)
733            if (name in self.stack.nameRemotes and
734                    self.stack.nameRemotes[name] is not self.remote): # non unique name
735                emsg = "Joiner {0}. Name '{1}' unavailable for remote {2}\n".format(
736                                self.stack.name, name, self.remote.name)
737                console.terse(emsg)
738                self.nack(kind=PcktKind.reject.value)
739                return
740
741            if name != self.remote.name:
742                try:
743                    self.stack.renameRemote(self.remote, new=name)
744                except raeting.StackError as ex:
745                    console.terse(str(ex) + '\n')
746                    self.stack.incStat(self.statKey())
747                    self.remove(index=self.txPacket.index)
748                    return
749
750            if rha != self.remote.ha:
751                self.remote.ha = rha
752            if fuid != self.remote.fuid:
753                self.remote.fuid = fuid
754            if main != self.remote.main:
755                self.remote.main = main
756            if kind != self.remote.kind:
757                self.remote.kind = kind
758            if self.remote.role != role:
759                self.remote.role = role # rerole
760            if ns2b(verhex) != self.remote.verfer.keyhex:
761                self.remote.verfer = nacling.Verifier(verhex) # verify key manager
762            if ns2b(pubhex) != self.remote.pubber.keyhex:
763                self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager
764            # don't dump until complete
765
766        if status == Acceptance.accepted: # accepted
767            self.completify()
768            return
769
770        # else status == raeting.acceptance.pending or None
771        self.pendify()
772
773    def pendify(self):
774        '''
775        Perform pending on remote
776        '''
777        self.stack.dumpRemote(self.remote)
778        self.ackPend()
779
780    def ackPend(self):
781        '''
782        Send ack pending to accept response
783        '''
784        body = odict()
785        packet = packeting.TxPacket(stack=self.stack,
786                                    kind=PcktKind.pend.value,
787                                    embody=body,
788                                    data=self.txData)
789        try:
790            packet.pack()
791        except raeting.PacketError as ex:
792            console.terse(str(ex) + '\n')
793            self.stack.incStat("packing_error")
794            self.remove(index=self.txPacket.index)
795            return
796
797        console.concise("Joiner {0}. Do Ack Pend of {1} in {2} at {3}\n".format(
798                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
799
800        self.transmit(packet)
801
802    def completify(self):
803        '''
804        Finalize full acceptance
805        '''
806        if self.remote.sid == 0: # session id  must be non-zero after join
807            self.remote.nextSid() # start new session
808            self.remote.replaceStaleInitiators() # this join not stale since sid == 0
809        if self.vacuous:
810            self.remote.rsid = 0 # reset .rsid on vacuous join so allow will work
811        self.remote.joined = True #accepted
812        self.stack.dumpRemote(self.remote)
813        self.stack.dumpLocal() #persist puid
814        self.ackAccept()
815
816    def ackAccept(self):
817        '''
818        Send ack accepted to accept response
819        '''
820        body = odict()
821        packet = packeting.TxPacket(stack=self.stack,
822                                    kind=PcktKind.ack.value,
823                                    embody=body,
824                                    data=self.txData)
825        try:
826            packet.pack()
827        except raeting.PacketError as ex:
828            console.terse(str(ex) + '\n')
829            self.stack.incStat("packing_error")
830            self.remove(index=self.txPacket.index)
831            return
832
833        console.concise("Joiner {0}. Do Ack Accept, Done with {1} in {2} at {3}\n".format(
834                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
835        self.stack.incStat("join_initiate_complete")
836
837        self.transmit(packet)
838        self.remove(index=self.txPacket.index) # self.rxPacket.index
839
840        if self.cascade:
841            self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
842
843    def refuse(self):
844        '''
845        Process nack to join packet refused as join already in progress or some
846        other problem that does not change the joined attribute
847        '''
848        if not self.stack.parseInner(self.rxPacket):
849            return
850        console.terse("Joiner {0}. Refused by {1} in {2} at {3}\n".format(
851                 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
852        self.stack.incStat(self.statKey())
853        self.remove(index=self.txPacket.index)
854
855    def reject(self):
856        '''
857        Process nack to join packet, join rejected
858        '''
859        if not self.stack.parseInner(self.rxPacket):
860            return
861        console.terse("Joiner {0}. Rejected by {1} in {2} at {3}\n".format(
862                 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
863        self.stack.incStat(self.statKey())
864        self.remove(index=self.txPacket.index)
865        self.stack.removeRemote(self.remote, clear=True)
866
867    def nack(self, kind=PcktKind.nack.value):
868        '''
869        Send nack to accept response
870        '''
871        body = odict()
872        packet = packeting.TxPacket(stack=self.stack,
873                                    kind=kind,
874                                    embody=body,
875                                    data=self.txData)
876        try:
877            packet.pack()
878        except raeting.PacketError as ex:
879            console.terse(str(ex) + '\n')
880            self.stack.incStat("packing_error")
881            self.remove(index=self.txPacket.index)
882            return
883
884        if kind == PcktKind.refuse:
885            console.terse("Joiner {0}. Do Nack Refuse of {1} in {2} at {3}\n".format(
886                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
887        elif  kind == PcktKind.reject:
888            console.terse("Joiner {0}. Do Nack Reject of {1} in {2} at {3}\n".format(
889                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
890        elif kind == PcktKind.nack:
891            console.terse("Joiner {0}. Do Nack of {1} in {2} at {3}\n".format(
892                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
893        else:
894            console.terse("Joiner {0}. Invalid nack kind {1}. Do Nack of {2} anyway "
895                    "in {3} at {4}\n".format(self.stack.name,
896                                       kind,
897                                       self.remote.name,
898                                       self.tid,
899                                       self.stack.store.stamp))
900            kind == PcktKind.nack
901        self.stack.incStat(self.statKey())
902        self.transmit(packet)
903        self.remove(index=self.txPacket.index)
904
905class Joinent(Correspondent):
906    '''
907    RAET protocol Joinent transaction class, dual of Joiner
908
909    Joinent does not add new remote to .remotes if rejected
910    '''
911    RedoTimeoutMin = 0.1 # initial timeout
912    RedoTimeoutMax = 2.0 # max timeout
913    PendRedoTimeout = 60.0 # redo timeout when pended
914
915    def __init__(self,
916                 redoTimeoutMin=None,
917                 redoTimeoutMax=None,
918                 pendRedoTimeout=None,
919                 **kwa):
920        '''
921        Setup Transaction instance
922        '''
923        kwa['kind'] = TrnsKind.join.value
924        super(Joinent, self).__init__(**kwa)
925
926        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
927        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
928        self.redoTimer = StoreTimer(self.stack.store, duration=0.0)
929        self.pendRedoTimeout = pendRedoTimeout or self.PendRedoTimeout
930        self.vacuous = None # gets set in join method
931        self.pended = False # Farside initiator has pended remote acceptance
932        self.prep()
933
934    def transmit(self, packet):
935        '''
936        Augment transmit with restart of redo timer
937        '''
938        super(Joinent, self).transmit(packet)
939        self.redoTimer.restart()
940
941    def add(self, remote=None, index=None):
942        '''
943        Augment with add self.remote to stack.joinees if vacuous
944        '''
945        super(Joinent, self).add(remote=remote, index=index)
946        # self.remote is now assigned
947        if self.vacuous: # vacuous happens when both sides vacuous
948            self.stack.joinees[self.remote.ha] = self.remote
949
950    def remove(self, remote=None, index=None):
951        '''
952        Remove self from stack transactions
953        '''
954        super(Joinent, self).remove(remote=remote, index=index)
955        # self.remote is now assigned
956        if self.vacuous: # vacuous
957            if self.remote.ha in self.stack.joinees and not self.remote.transactions:
958                del self.stack.joinees[self.remote.ha]
959
960    def receive(self, packet):
961        """
962        Process received packet belonging to this transaction
963        """
964        super(Joinent, self).receive(packet) #  self.rxPacket = packet
965
966        if packet.data['tk'] == TrnsKind.join:
967            if packet.data['pk'] == PcktKind.request:
968                self.stack.incStat('joinent_rx_request')
969                self.join()
970            elif packet.data['pk'] == PcktKind.pend: # maybe pending
971                self.stack.incStat('joinent_rx_pend')
972                self.pend()
973            elif packet.data['pk'] == PcktKind.ack: #accepted by joiner
974                self.stack.incStat('joinent_rx_ack')
975                self.complete()
976            elif packet.data['pk'] == PcktKind.nack: #stale
977                self.stack.incStat('joinent_rx_nack')
978                self.refuse()
979            elif packet.data['pk'] == PcktKind.refuse: #refused
980                self.stack.incStat('joinent_rx_refuse')
981                self.refuse()
982            elif packet.data['pk'] == PcktKind.reject: #rejected
983                self.stack.incStat('joinent_rx_reject')
984                self.reject()
985
986    def process(self):
987        '''
988        Perform time based processing of transaction
989
990        '''
991        if self.timeout > 0.0 and self.timer.expired:
992            self.nack() # stale
993            console.concise("Joinent {0}. Timed out with {1} in {2} at {3}\n".format(
994                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
995            return
996
997        # need to perform the check for accepted status and then send accept
998        if self.redoTimer.expired:
999            if not self.pended:
1000                duration = min(
1001                             max(self.redoTimeoutMin,
1002                                  self.redoTimer.duration * 2.0),
1003                             self.redoTimeoutMax)
1004            else:
1005                duration = self.pendRedoTimeout
1006
1007            self.redoTimer.restart(duration=duration)
1008
1009            if (self.txPacket and
1010                    self.txPacket.data['pk'] == PcktKind.response):
1011                self.transmit(self.txPacket) #redo
1012                console.concise("Joinent {0}. Redo Accept with {1} in {2} at {3}\n".format(
1013                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1014                self.stack.incStat('joinent_tx_accept_redo')
1015            else: #check to see if status has changed to accept
1016                if self.remote:
1017                    status = self.stack.keep.statusRemote(self.remote, dump=True)
1018                    if status == Acceptance.accepted:
1019                        self.ackAccept()
1020                    elif status == Acceptance.rejected:
1021                        "Stack {0}: Estate '{1}' uid '{2}' keys rejected\n".format(
1022                                self.stack.name, self.remote.name, self.remote.uid)
1023                        self.stack.removeRemote(self.remote,clear=True)
1024                        # removeRemote also nacks
1025
1026    def prep(self):
1027        '''
1028        Prepare .txData
1029        '''
1030        #since bootstrap transaction use the reversed seid and deid from packet
1031        self.txData.update(
1032                            dh=self.rxPacket.data['sh'], # may need for index
1033                            dp=self.rxPacket.data['sp'], # may need for index
1034                            se=self.rxPacket.data['de'],
1035                            de=self.rxPacket.data['se'],
1036                            tk=self.kind,
1037                            cf=self.rmt,
1038                            bf=self.bcst,
1039                            si=self.sid,
1040                            ti=self.tid,
1041                            ck=CoatKind.nada.value,
1042                            fk=FootKind.nada.value,
1043                          )
1044
1045    def join(self):
1046        '''
1047        Process join packet
1048        Each estate must have a set of unique credentials on the road
1049        The credentials are.
1050        uid (estate id), name, ha (host address, port)
1051        Each of the three credentials must be separably unique on the Road, that is
1052        the uid must be unique, the name must be unique, the ha must be unique.
1053
1054        The other credentials are the role and keys. Multiple estates may share
1055        the same role and associated keys. The keys are the signing key and the
1056        encryption key.
1057
1058        Once an estate has joined the first time it will be assigned an uid.
1059        Changing any of the credentials after this requires that the Road be mutable.
1060
1061        '''
1062        if not self.stack.parseInner(self.rxPacket):
1063            return
1064
1065        # Don't add transaction yet wait till later until transaction is permitted
1066        # as not a duplicate and role keys are not rejected
1067
1068        data = self.rxPacket.data
1069        body = self.rxPacket.body.data
1070
1071        name = body.get('name')
1072        if not name:
1073            emsg = "Missing remote name in join packet\n"
1074            console.terse(emsg)
1075            self.stack.incStat('invalid_join')
1076            self.remove(index=self.rxPacket.index)
1077            return
1078
1079        mode = body.get('mode')
1080        if mode is None or not isinstance(mode, int) or mode < 0 or mode > 255:
1081            emsg = "Missing or invalid remote stack operation mode in join packet\n"
1082            console.terse(emsg)
1083            self.stack.incStat('invalid_join')
1084            self.remove(index=self.rxPacket.index)
1085            return
1086        flags = unpackByte(fmt=b'11111111', byte=mode, boolean=True)
1087        main = flags[7]
1088
1089        kind = body.get('kind')
1090        if kind is None:
1091            emsg = "Missing or invalid remote application kind in join packet\n"
1092            console.terse(emsg)
1093            self.stack.incStat('invalid_join')
1094            self.remove(index=self.rxPacket.index)
1095            return
1096
1097        verhex = body.get('verhex', '')
1098        if not verhex:
1099            emsg = "Missing remote verifier key in join packet\n"
1100            console.terse(emsg)
1101            self.stack.incStat('invalid_join')
1102            self.remove(index=self.rxPacket.index)
1103            return
1104
1105        pubhex = body.get('pubhex', '')
1106        if not pubhex:
1107            emsg = "Missing remote crypt key in join packet\n"
1108            console.terse(emsg)
1109            self.stack.incStat('invalid_join')
1110            self.remove(index=self.rxPacket.index)
1111            return
1112
1113        role = body.get('role')
1114        if not role:
1115            emsg = "Missing remote role in join packet\n"
1116            console.terse(emsg)
1117            self.stack.incStat('invalid_join')
1118            self.remove(index=self.rxPacket.index)
1119            return
1120
1121        rha = (data['sh'], data['sp'])
1122        reid = data['se']
1123        leid = data['de']
1124
1125        self.vacuous = (leid == 0)
1126
1127        joins = self.remote.joinInProcess()
1128        for join in joins: # only one join at a time is permitted
1129            if join is self: # duplicate join packet so drop
1130                emsg = ("Joinent {0}. Duplicate join from {1}. "
1131                        "Dropping...\n".format(self.stack.name, self.remote.name))
1132                console.concise(emsg)
1133                self.stack.incStat('duplicate_join_attempt')
1134                return
1135
1136            if join.rmt: # is already a correspondent to a join
1137                emsg = ("Joinent {0}. Another joinent already in process with {1}. "
1138                       "Aborting...\n".format(self.stack.name, self.remote.name))
1139                console.concise(emsg)
1140                self.stack.incStat('redundant_join_attempt')
1141                self.nack(kind=PcktKind.refuse.value)
1142                return
1143
1144            else: # already initiator join in process, resolve race condition
1145                if self.vacuous and not join.vacuous: # non-vacuous beats vacuous
1146                    emsg = ("Joinent {0}. Already initiated non-vacuous join with {1}. "
1147                            "Aborting because vacuous...\n".format(
1148                                self.stack.name, self.remote.name))
1149                    console.concise(emsg)
1150                    self.stack.incStat('redundant_join_attempt')
1151                    self.nack(kind=PcktKind.refuse.value)
1152                    return
1153
1154                if not self.vacuous and join.vacuous: # non-vacuous beats vacuous
1155                    emsg = ("Joinent {0}. Removing vacuous initiator join with"
1156                            " {1}. Proceeding because not vacuous...\n".format(
1157                                            self.stack.name, self.remote.name))
1158                    console.concise(emsg)
1159                    join.nack(kind=PcktKind.refuse.value)
1160
1161                else: # both vacuous or non-vacuous, so use name to resolve
1162                    if self.stack.local.name < name: # abort local correspondent and remote initiator
1163                        emsg = ("Joinent {0}. Already initiated join with {1}. "
1164                                "Aborting because lesser local name...\n".format(
1165                                    self.stack.name, self.remote.name))
1166                        console.concise(emsg)
1167                        self.stack.incStat('redundant_join_attempt')
1168                        self.nack(kind=PcktKind.refuse.value)
1169                        return
1170
1171                    else: # nack to abort local initiator and remote correspondent
1172                        emsg = ("Joinent {0}. Removing initiator join with {1}. "
1173                                "Proceeding because lesser local name...\n".format(
1174                                    self.stack.name, self.remote.name))
1175                        console.concise(emsg)
1176                        join.nack(kind=PcktKind.refuse.value)
1177
1178        if self.vacuous: # vacuous join
1179            if not self.stack.main:
1180                emsg = "Joinent {0}. Invalid vacuous join not main\n".format(self.stack.name)
1181                console.terse(emsg)
1182                self.nack(kind=PcktKind.reject.value)
1183                return
1184
1185            if name in self.stack.nameRemotes: # non ephemeral name match
1186                self.remote = self.stack.nameRemotes[name] # replace so not ephemeral
1187
1188            else: # ephemeral and unique name
1189                self.remote.name = name
1190                self.remote.main = main
1191                self.remote.kind = kind
1192                self.remote.rha = rha
1193                self.remote.role = role
1194                self.remote.verfer = nacling.Verifier(verhex) # verify key manager
1195                self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager
1196                if self.remote.fuid != reid:
1197                    if self.remote.fuid == 0:  # vacuous join created remote in stack
1198                        self.remote.fuid = reid
1199                    else:
1200                        emsg = ("Joinent {0}. Mishandled join reid='{1}' !=  fuid='{2}' for "
1201                               "remote {2}\n".format(self.stack.name, reid, self.remote.fuid, name))
1202                        console.terse(emsg)
1203                        self.nack(kind=PcktKind.reject.value)
1204                        return
1205
1206        else: # non vacuous join
1207            if self.remote is not self.stack.remotes[leid]: # something is wrong
1208                emsg = "Joinent {0}. Mishandled join leid '{1}' for remote {2}\n".format(
1209                                                    self.stack.name, leid, name)
1210                console.terse(emsg)
1211                self.nack(kind=PcktKind.reject.value)
1212                return
1213
1214
1215        sameRoleKeys = (role == self.remote.role and
1216                        ns2b(verhex) == self.remote.verfer.keyhex and
1217                        ns2b(pubhex) == self.remote.pubber.keyhex)
1218
1219        sameAll = (sameRoleKeys and
1220                   name == self.remote.name and
1221                   rha == self.remote.ha and
1222                   reid == self.remote.fuid and
1223                   main == self.remote.main and
1224                   kind == self.remote.kind)
1225
1226        if not sameAll and not self.stack.mutable:
1227            emsg = ("Joinent {0}. Attempt to change immutable road by "
1228                                   "'{1}'\n".format(self.stack.name,
1229                                                    self.remote.name))
1230            console.terse(emsg)
1231            # reject not mutable road
1232            self.nack(kind=PcktKind.reject.value)
1233            return
1234
1235        status = self.stack.keep.statusRole(role=role,
1236                                            verhex=verhex,
1237                                            pubhex=pubhex,
1238                                            dump=True)
1239
1240
1241        if status == Acceptance.rejected:
1242            emsg = ("Joinent {0}. Keys of role='{1}' rejected for remote name='{2}'"
1243                    " nuid='{3}' fuid='{4}' ha='{5}'\n".format(self.stack.name,
1244                                                              self.remote.role,
1245                                                              self.remote.name,
1246                                                              self.remote.nuid,
1247                                                              self.remote.fuid,
1248                                                              self.remote.ha))
1249            console.concise(emsg)
1250            if sameRoleKeys and self.remote.uid in self.stack.remotes:
1251                self.stack.removeRemote(self.remote, clear=True) #clear remote
1252                # removeRemote also nacks which is a reject
1253            else: # reject as keys rejected
1254                self.nack(kind=PcktKind.reject.value)
1255            return
1256
1257        #accepted or pended
1258        self.remote.acceptance = status
1259
1260        if sameAll: #ephemeral will always be sameAll because assigned above
1261            if self.remote.uid not in self.stack.remotes: # ephemeral
1262                try:
1263                    self.stack.addRemote(self.remote)
1264                except raeting.StackError as ex:
1265                    console.terse(str(ex) + '\n')
1266                    self.stack.incStat(self.statKey())
1267                    return
1268
1269                emsg = ("Joinent {0}. Added new remote name='{1}' nuid='{2}' fuid='{3}' "
1270                        "ha='{4}' role='{5}'\n".format(self.stack.name,
1271                                          self.remote.name,
1272                                          self.remote.nuid,
1273                                          self.remote.fuid,
1274                                          self.remote.ha,
1275                                          self.remote.role))
1276                console.concise(emsg)
1277                # do dump until complete
1278
1279        else: # not sameAll (and mutable)
1280            # do both unique name check first so only change road if new unique
1281            if (name in self.stack.nameRemotes and
1282                    self.stack.nameRemotes[name] is not self.remote): # non unique name
1283                emsg = "Joinent {0}.  Name '{1}' unavailable for remote {2}\n".format(
1284                                self.stack.name, name, self.remote.name)
1285                console.terse(emsg)
1286                self.nack(kind=PcktKind.reject.value)
1287                return
1288
1289            if name != self.remote.name:
1290                try:
1291                    self.stack.renameRemote(self.remote, new=name)
1292                except raeting.StackError as ex:
1293                    console.terse(str(ex) + '\n')
1294                    self.stack.incStat(self.statKey())
1295                    return
1296
1297            if rha != self.remote.ha:
1298                self.remote.ha = rha
1299            if reid != self.remote.fuid:
1300                self.remote.fuid = reid
1301            if main != self.remote.main:
1302                self.remote.main = main
1303            if kind != self.remote.kind:
1304                self.remote.kind = kind
1305            if role != self.remote.role: # rerole
1306                self.remote.role = role
1307            if ns2b(verhex) != self.remote.verfer.keyhex:
1308                self.remote.verfer = nacling.Verifier(verhex) # verify key manager
1309            if ns2b(pubhex) != self.remote.pubber.keyhex:
1310                self.remote.pubber = nacling.Publican(pubhex) # long term crypt key manager
1311
1312        # add transaction
1313        self.add(remote=self.remote, index=self.rxPacket.index)
1314        self.remote.joined = None
1315
1316        if status == Acceptance.accepted:
1317            duration = min(
1318                            max(self.redoTimeoutMin,
1319                              self.redoTimer.duration * 2.0),
1320                            self.redoTimeoutMax)
1321            self.redoTimer.restart(duration=duration)
1322            self.ackAccept()
1323            return
1324
1325        # status == raeting.acceptance.pending or status == None:
1326        self.pendify()  # change to ackPend
1327
1328    def pendify(self):
1329        '''
1330        Performing pending operation on remote
1331        '''
1332        self.stack.dumpRemote(self.remote)
1333        self.ackPend()
1334
1335    def ackPend(self):
1336        '''
1337        Send ack to join request
1338        '''
1339        body = odict()
1340        packet = packeting.TxPacket(stack=self.stack,
1341                                    kind=PcktKind.pend.value,
1342                                    embody=body,
1343                                    data=self.txData)
1344        try:
1345            packet.pack()
1346        except raeting.PacketError as ex:
1347            console.terse(str(ex) + '\n')
1348            self.stack.incStat("packing_error")
1349            self.remove(index=self.rxPacket.index)
1350            return
1351
1352        console.concise("Joinent {0}. Do Ack Pending accept of {1} in {2} at {3}\n".format(
1353                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1354        self.transmit(packet)
1355
1356    def ackAccept(self):
1357        '''
1358        Send accept response to join request
1359        '''
1360        if self.stack.kind is None:
1361            self.stack.kind = 0
1362        else:
1363            if self.stack.kind < 0 or self.stack.kind > 255:
1364                emsg = ("Joinent {0}. Invalid application kind field value {1} for {2}. "
1365                                "Aborting...\n".format(
1366                                                       self.stack.name,
1367                                                       self.stack.kind,
1368                                                       self.remote.name))
1369                console.concise(emsg)
1370                return
1371
1372        flags = [0, 0, 0, 0, 0, 0, 0, self.stack.main] # stack operation mode flags
1373        operation = packByte(fmt=b'11111111', fields=flags)
1374        body = odict([ ('name', self.stack.local.name),
1375                       ('mode', operation),
1376                       ('kind', self.stack.kind),
1377                       ('uid', self.remote.uid),
1378                       ('verhex', str(self.stack.local.signer.verhex.decode('ISO-8859-1'))
1379                                  if self.stack.local.signer.verhex else None ),
1380                       ('pubhex', str(self.stack.local.priver.pubhex.decode('ISO-8859-1'))
1381                                  if self.stack.local.priver.pubhex else None),
1382                       ('role', self.stack.local.role)])
1383        packet = packeting.TxPacket(stack=self.stack,
1384                                    kind=PcktKind.response.value,
1385                                    embody=body,
1386                                    data=self.txData)
1387        try:
1388            packet.pack()
1389        except raeting.PacketError as ex:
1390            console.terse(str(ex) + '\n')
1391            self.stack.incStat("packing_error")
1392            self.remove(index=self.rxPacket.index)
1393            return
1394
1395        console.concise("Joinent {0}. Do Accept of {1} in {2} at {3}\n".format(
1396                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1397        self.transmit(packet)
1398
1399    def pend(self):
1400        '''
1401        Process ack pend to join packet
1402        '''
1403        if not self.stack.parseInner(self.rxPacket):
1404            return
1405        self.pended = True
1406
1407    def complete(self):
1408        '''
1409        process ack to accept response
1410        '''
1411        if not self.stack.parseInner(self.rxPacket):
1412            return
1413
1414        console.concise("Joinent {0}. Done with {1} in {2} at {3}\n".format(
1415                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1416        self.stack.incStat("join_correspond_complete")
1417
1418        if self.remote.sid == 0: # session id  must be non-zero after join
1419            self.remote.nextSid() # start new session
1420            self.remote.replaceStaleInitiators()
1421        if self.vacuous:
1422            self.remote.rsid = 0 # reset .rsid on vacuous join so allow will work
1423        self.remote.joined = True # accepted
1424        self.stack.dumpRemote(self.remote)
1425        self.stack.dumpLocal() # persist puid
1426        self.remove(index=self.rxPacket.index)
1427
1428    def reject(self):
1429        '''
1430        Process reject nack because keys rejected
1431        '''
1432        if not self.stack.parseInner(self.rxPacket):
1433            return
1434
1435        console.terse("Joinent {0}. Rejected by {1} in {2} at {3}\n".format(
1436                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1437        self.stack.incStat(self.statKey())
1438        self.remove(index=self.rxPacket.index)
1439        self.stack.removeRemote(self.remote, clear=True)
1440
1441    def refuse(self):
1442        '''
1443        Process refuse nack because join already in progress or stale
1444        '''
1445        if not self.stack.parseInner(self.rxPacket):
1446            return
1447        console.terse("Joinent {0}. Refused by {1} in {2} at {3}\n".format(
1448                 self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1449        self.stack.incStat(self.statKey())
1450        self.remove(index=self.rxPacket.index)
1451
1452    def nack(self, kind=PcktKind.nack.value):
1453        '''
1454        Send nack to join request.
1455        Sometimes nack occurs without remote being added so have to nack using
1456        rxPacket source ha.
1457        '''
1458        #if not self.remote or self.remote.uid not in self.stack.remotes:
1459            #self.txData.update( dh=self.rxPacket.data['sh'], dp=self.rxPacket.data['sp'],)
1460            #ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp'])
1461        #else:
1462            #ha = self.remote.ha
1463
1464        ha = (self.rxPacket.data['sh'], self.rxPacket.data['sp'])
1465
1466        body = odict()
1467        packet = packeting.TxPacket(stack=self.stack,
1468                                    kind=kind,
1469                                    embody=body,
1470                                    data=self.txData)
1471        try:
1472            packet.pack()
1473        except raeting.PacketError as ex:
1474            console.terse(str(ex) + '\n')
1475            self.stack.incStat("packing_error")
1476            self.remove(index=self.rxPacket.index)
1477            return
1478
1479        if kind == PcktKind.renew:
1480            console.terse("Joinent {0}. Do Nack Renew of {1} in {2} at {3}\n".format(
1481                    self.stack.name, ha, self.tid, self.stack.store.stamp))
1482        elif kind == PcktKind.refuse:
1483            console.terse("Joinent {0}. Do Nack Refuse of {1} in {2} at {3}\n".format(
1484                    self.stack.name, ha, self.tid, self.stack.store.stamp))
1485        elif kind == PcktKind.reject:
1486            console.terse("Joinent {0}. Do Nack Reject of {1} in {2} at {3}\n".format(
1487                    self.stack.name, ha, self.tid, self.stack.store.stamp))
1488        elif kind == PcktKind.nack:
1489            console.terse("Joinent {0}. Do Nack of {1} in {2} at {3}\n".format(
1490                    self.stack.name, ha, self.tid, self.stack.store.stamp))
1491        else:
1492            console.terse("Joinent {0}. Invalid nack kind {1}. Do Nack of {2} anyway "
1493                    " in {3} at {4}\n".format(self.stack.name,
1494                                       kind,
1495                                       ha,
1496                                       self.tid,
1497                                       self.stack.store.stamp))
1498            kind == PcktKind.nack
1499
1500        self.stack.incStat(self.statKey())
1501
1502        self.stack.txes.append((packet.packed, ha))
1503        self.remove(index=self.rxPacket.index)
1504
1505class Allower(Initiator):
1506    '''
1507    RAET protocol Allower Initiator class Dual of Allowent
1508    CurveCP handshake
1509    '''
1510    Timeout = 4.0
1511    RedoTimeoutMin = 0.25 # initial timeout
1512    RedoTimeoutMax = 1.0 # max timeout
1513
1514    def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None,
1515                 cascade=False, **kwa):
1516        '''
1517        Setup instance
1518        '''
1519        kwa['kind'] = TrnsKind.allow.value
1520        super(Allower, self).__init__(**kwa)
1521
1522        self.cascade = cascade
1523
1524        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
1525        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
1526        self.redoTimer = StoreTimer(self.stack.store,
1527                                           duration=self.redoTimeoutMin)
1528
1529        self.sid = self.remote.sid
1530        self.tid = self.remote.nextTid()
1531        self.oreo = None # cookie from correspondent needed until handshake completed
1532        self.prep() # prepare .txData
1533
1534    def transmit(self, packet):
1535        '''
1536        Augment transmit with restart of redo timer
1537        '''
1538        super(Allower, self).transmit(packet)
1539        self.redoTimer.restart()
1540
1541    def receive(self, packet):
1542        """
1543        Process received packet belonging to this transaction
1544        """
1545        super(Allower, self).receive(packet) #  self.rxPacket = packet
1546
1547        if packet.data['tk'] == TrnsKind.allow:
1548            if packet.data['pk'] == PcktKind.cookie:
1549                self.cookie()
1550            elif packet.data['pk'] == PcktKind.ack:
1551                self.allow()
1552            elif packet.data['pk'] == PcktKind.nack: # rejected
1553                self.refuse()
1554            elif packet.data['pk'] == PcktKind.refuse: # refused
1555                self.refuse()
1556            elif packet.data['pk'] == PcktKind.reject: #rejected
1557                self.reject()
1558            elif packet.data['pk'] == PcktKind.unjoined: # unjoined
1559                self.unjoin()
1560
1561    def process(self):
1562        '''
1563        Perform time based processing of transaction
1564        '''
1565        if self.timeout > 0.0 and self.timer.expired:
1566            self.remove()
1567            console.concise("Allower {0}. Timed out with {1} in {2} at {3}\n".format(
1568                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1569            return
1570
1571        # need keep sending join until accepted or timed out
1572        if self.redoTimer.expired:
1573            duration = min(
1574                         max(self.redoTimeoutMin,
1575                              self.redoTimer.duration * 2.0),
1576                         self.redoTimeoutMax)
1577            self.redoTimer.restart(duration=duration)
1578            if self.txPacket:
1579                if self.txPacket.data['pk'] == PcktKind.hello:
1580                    self.transmit(self.txPacket) # redo
1581                    console.concise("Allower {0}. Redo Hello with {1} in {2} at {3}\n".format(
1582                            self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1583                    self.stack.incStat('redo_hello')
1584
1585                if self.txPacket.data['pk'] == PcktKind.initiate:
1586                    self.transmit(self.txPacket) # redo
1587                    console.concise("Allower {0}. Redo Initiate with {1} in {2} at {3}\n".format(
1588                             self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1589                    self.stack.incStat('redo_initiate')
1590
1591                if self.txPacket.data['pk'] == PcktKind.ack:
1592                    self.transmit(self.txPacket) # redo
1593                    console.concise("Allower {0}. Redo Ack Final with {1} in {2} at {3}\n".format(
1594                             self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1595                    self.stack.incStat('redo_final')
1596
1597    def prep(self):
1598        '''
1599        Prepare .txData
1600        '''
1601        self.txData.update(
1602                            dh=self.remote.ha[0], # maybe needed for index
1603                            dp=self.remote.ha[1], # maybe needed for index
1604                            se=self.remote.nuid,
1605                            de=self.remote.fuid,
1606                            tk=self.kind,
1607                            cf=self.rmt,
1608                            bf=self.bcst,
1609                            si=self.sid,
1610                            ti=self.tid,
1611                          )
1612
1613    def hello(self):
1614        '''
1615        Send hello request
1616        '''
1617        joins = self.remote.joinInProcess()
1618        if joins:
1619            emsg = ("Allower {0}. Attempt to allow while join still in process with {1}.  "
1620                                    "Aborting...\n".format(self.stack.name, self.remote.name))
1621            console.concise(emsg)
1622            self.stack.incStat('invalid_allow_attempt')
1623            return
1624
1625        allows = self.remote.allowInProcess()
1626        if allows:
1627            emsg = ("Allower {0}. Allow with {1} already in process\n".format(
1628                                    self.stack.name, self.remote.name))
1629            console.concise(emsg)
1630            return
1631
1632        self.remote.allowed = None
1633        if not self.remote.joined:
1634            emsg = "Allower {0}. Must be joined first\n".format(self.stack.name)
1635            console.terse(emsg)
1636            self.stack.incStat('unjoined_remote')
1637            self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
1638            return
1639
1640        self.remote.rekey() # refresh short term keys and reset .allowed to None
1641        self.add()
1642
1643        plain = binascii.hexlify(b''.rjust(32, b'\x00'))
1644        cipher, nonce = self.remote.privee.encrypt(plain, self.remote.pubber.key)
1645        body = raeting.HELLO_PACKER.pack(plain, self.remote.privee.pubraw, cipher, nonce)
1646
1647        packet = packeting.TxPacket(stack=self.stack,
1648                                    kind=PcktKind.hello,
1649                                    embody=body,
1650                                    data=self.txData)
1651        try:
1652            packet.pack()
1653        except raeting.PacketError as ex:
1654            console.terse(str(ex) + '\n')
1655            self.stack.incStat("packing_error")
1656            self.remove()
1657            return
1658        self.transmit(packet)
1659        console.concise("Allower {0}. Do Hello with {1} in {2} at {3}\n".format(
1660                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1661
1662    def cookie(self):
1663        '''
1664        Process cookie packet
1665        '''
1666        if not self.stack.parseInner(self.rxPacket):
1667            return
1668
1669        data = self.rxPacket.data
1670        body = self.rxPacket.body.data
1671
1672        if not isinstance(body, bytes):
1673            emsg = "Invalid format of cookie packet body\n"
1674            console.terse(emsg)
1675            self.stack.incStat('invalid_cookie')
1676            #self.remove()
1677            self.nack(kind=PcktKind.reject.value)
1678            return
1679
1680        if len(body) != raeting.COOKIE_PACKER.size:
1681            emsg = "Invalid length of cookie packet body\n"
1682            console.terse(emsg)
1683            self.stack.incStat('invalid_cookie')
1684            #self.remove()
1685            self.nack(kind=PcktKind.reject.value)
1686            return
1687
1688        cipher, nonce = raeting.COOKIE_PACKER.unpack(body)
1689
1690        try:
1691            msg = self.remote.privee.decrypt(cipher, nonce, self.remote.pubber.key)
1692        except ValueError as ex:
1693            emsg = "Invalid cookie stuff: '{0}'\n".format(str(ex))
1694            console.terse(emsg)
1695            self.stack.incStat('invalid_cookie')
1696            #self.remove()
1697            self.nack(kind=PcktKind.reject.value)
1698            return
1699
1700        if len(msg) != raeting.COOKIESTUFF_PACKER.size:
1701            emsg = "Invalid length of cookie stuff\n"
1702            console.terse(emsg)
1703            self.stack.incStat('invalid_cookie')
1704            #self.remove()
1705            self.nack(kind=PcktKind.reject.value)
1706            return
1707
1708        shortraw, seid, deid, oreo = raeting.COOKIESTUFF_PACKER.unpack(msg)
1709
1710        if seid != self.remote.fuid or deid != self.remote.nuid:
1711            emsg = "Invalid seid or deid fields in cookie stuff\n"
1712            console.terse(emsg)
1713            self.stack.incStat('invalid_cookie')
1714            #self.remove()
1715            self.nack(kind=PcktKind.reject.value)
1716            return
1717
1718        self.oreo = binascii.hexlify(oreo)
1719        self.remote.publee = nacling.Publican(key=shortraw)
1720
1721        self.initiate()
1722
1723    def initiate(self):
1724        '''
1725        Send initiate request to cookie response to hello request
1726        '''
1727        vcipher, vnonce = self.stack.local.priver.encrypt(self.remote.privee.pubraw,
1728                                                self.remote.pubber.key)
1729
1730        fqdn = self.remote.fqdn
1731        if isinstance(fqdn, unicode):
1732            fqdn = fqdn.encode('ascii', 'ignore')
1733        fqdn = fqdn.ljust(128, b' ')[:128]
1734
1735        stuff = raeting.INITIATESTUFF_PACKER.pack(self.stack.local.priver.pubraw,
1736                                                  vcipher,
1737                                                  vnonce,
1738                                                  fqdn)
1739
1740        cipher, nonce = self.remote.privee.encrypt(stuff, self.remote.publee.key)
1741
1742        oreo = binascii.unhexlify(self.oreo)
1743        body = raeting.INITIATE_PACKER.pack(self.remote.privee.pubraw,
1744                                            oreo,
1745                                            cipher,
1746                                            nonce)
1747
1748        packet = packeting.TxPacket(stack=self.stack,
1749                                    kind=PcktKind.initiate,
1750                                    embody=body,
1751                                    data=self.txData)
1752        try:
1753            packet.pack()
1754        except raeting.PacketError as ex:
1755            console.terse(str(ex) + '\n')
1756            self.stack.incStat("packing_error")
1757            self.remove()
1758            return
1759
1760        self.transmit(packet)
1761        console.concise("Allower {0}. Do Initiate with {1} in {2} at {3}\n".format(
1762                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1763
1764    def allow(self):
1765        '''
1766        Process ackInitiate packet
1767        Perform allowment in response to ack to initiate packet
1768        Transmits ack to complete transaction so correspondent knows
1769        '''
1770        if not self.stack.parseInner(self.rxPacket):
1771            return
1772
1773        self.remote.allowed = True
1774        self.remote.alived = True  # fast alive as soon as allowed
1775        self.ackFinal()
1776
1777    def ackFinal(self):
1778        '''
1779        Send ack to ack Initiate to terminate transaction
1780        This is so both sides wait on acks so transaction is not restarted until
1781        boths sides see completion.
1782        '''
1783        body = b''
1784        packet = packeting.TxPacket(stack=self.stack,
1785                                    kind=PcktKind.ack.value,
1786                                    embody=body,
1787                                    data=self.txData)
1788        try:
1789            packet.pack()
1790        except raeting.PacketError as ex:
1791            console.terse(str(ex) + '\n')
1792            self.stack.incStat("packing_error")
1793            self.remove()
1794            return
1795
1796        self.remove()
1797        self.transmit(packet)
1798
1799        console.concise("Allower {0}. Do Ack Final, Done with {1} in {2} at {3}\n".format(
1800                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1801        self.stack.incStat("allow_initiate_complete")
1802
1803        self.remote.nextSid() # start new session always on successful allow
1804        self.remote.replaceStaleInitiators()
1805        self.stack.dumpRemote(self.remote)
1806        self.remote.sendSavedMessages() # could include messages saved on rejoin
1807        if self.cascade:
1808            self.stack.alive(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
1809
1810    def nack(self, kind=PcktKind.nack.value):
1811        '''
1812        Send nack to accept response
1813        '''
1814        body = b''
1815        packet = packeting.TxPacket(stack=self.stack,
1816                                    kind=kind,
1817                                    embody=body,
1818                                    data=self.txData)
1819        try:
1820            packet.pack()
1821        except raeting.PacketError as ex:
1822            console.terse(str(ex) + '\n')
1823            self.stack.incStat("packing_error")
1824            self.remove()
1825            return
1826
1827        if kind == PcktKind.refuse:
1828            console.terse("Allower {0}. Do Nack Refuse of {1} in {2} at {3}\n".format(
1829                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1830        elif kind == PcktKind.reject:
1831            console.terse("Allower {0}. Do Nack Reject of {1} in {2} at {3}\n".format(
1832                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1833        elif kind == PcktKind.nack:
1834            console.terse("Allower {0}. Do Nack of {1} in {2} at {3}\n".format(
1835                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1836        else:
1837            console.terse("Allower {0}. Invalid nack kind {1}. Do Nack of {2} anyway "
1838                    " in {3} at {4}\n".format(self.stack.name,
1839                                       kind,
1840                                       self.remote.name,
1841                                       self.tid,
1842                                       self.stack.store.stamp))
1843            kind == PcktKind.nack
1844
1845        self.remove()
1846        self.stack.incStat(self.statKey())
1847        self.transmit(packet)
1848
1849    def refuse(self):
1850        '''
1851        Process nack refuse to packet
1852        '''
1853        if not self.stack.parseInner(self.rxPacket):
1854            return
1855
1856        self.remove()
1857        console.concise("Allower {0}. Refused by {1} in {2} at {3}\n".format(
1858                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1859        self.stack.incStat(self.statKey())
1860
1861    def reject(self):
1862        '''
1863        Process nack reject to packet
1864        terminate in response to nack
1865        '''
1866        if not self.stack.parseInner(self.rxPacket):
1867            return
1868
1869        self.remote.allowed = False
1870        self.remove()
1871        console.concise("Allower {0}. Rejected by {1} in {2} at {3}\n".format(
1872                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1873        self.stack.incStat(self.statKey())
1874
1875    def unjoin(self):
1876        '''
1877        Process unjoin packet
1878        terminate in response to unjoin
1879        '''
1880        if not self.stack.parseInner(self.rxPacket):
1881            return
1882
1883        self.remote.joined = False
1884        self.remove()
1885        console.concise("Allower {0}. Rejected unjoin by {1} in {2} at {3}\n".format(
1886                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1887        self.stack.incStat(self.statKey())
1888        self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
1889
1890class Allowent(Correspondent):
1891    '''
1892    RAET protocol Allowent Correspondent class Dual of Allower
1893    CurveCP handshake
1894    '''
1895    Timeout = 4.0
1896    RedoTimeoutMin = 0.25 # initial timeout
1897    RedoTimeoutMax = 1.0 # max timeout
1898
1899    def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, **kwa):
1900        '''
1901        Setup instance
1902        '''
1903        kwa['kind'] = TrnsKind.allow.value
1904        super(Allowent, self).__init__(**kwa)
1905
1906        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
1907        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
1908        self.redoTimer = StoreTimer(self.stack.store,
1909                                           duration=self.redoTimeoutMin)
1910
1911        self.oreo = None #keep locally generated oreo around for redos
1912        self.prep() # prepare .txData
1913
1914    def transmit(self, packet):
1915        '''
1916        Augment transmit with restart of redo timer
1917        '''
1918        super(Allowent, self).transmit(packet)
1919        self.redoTimer.restart()
1920
1921    def receive(self, packet):
1922        """
1923        Process received packet belonging to this transaction
1924        """
1925        super(Allowent, self).receive(packet) #  self.rxPacket = packet
1926
1927        if packet.data['tk'] == TrnsKind.allow:
1928            if packet.data['pk'] == PcktKind.hello:
1929                self.hello()
1930            elif packet.data['pk'] == PcktKind.initiate:
1931                self.initiate()
1932            elif packet.data['pk'] == PcktKind.ack:
1933                self.final()
1934            elif packet.data['pk'] == PcktKind.nack: # rejected
1935                self.refuse()
1936            elif packet.data['pk'] == PcktKind.refuse: # refused
1937                self.refuse()
1938            elif packet.data['pk'] == PcktKind.reject: # rejected
1939                self.reject()
1940
1941    def process(self):
1942        '''
1943        Perform time based processing of transaction
1944
1945        '''
1946        if self.timeout > 0.0 and self.timer.expired:
1947            self.nack(kind=PcktKind.refuse.value)
1948            console.concise("Allowent {0}. Timed out with {1} in {2} at {3}\n".format(
1949                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1950            return
1951
1952        # need to perform the check for accepted status and then send accept
1953        if self.redoTimer.expired:
1954            duration = min(
1955                         max(self.redoTimeoutMin,
1956                              self.redoTimer.duration * 2.0),
1957                         self.redoTimeoutMax)
1958            self.redoTimer.restart(duration=duration)
1959
1960            if self.txPacket:
1961                if self.txPacket.data['pk'] == PcktKind.cookie:
1962                    self.transmit(self.txPacket) #redo
1963                    console.concise("Allowent {0}. Redo Cookie with {1} in {2} at {3}\n".format(
1964                             self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1965                    self.stack.incStat('redo_cookie')
1966
1967                if self.txPacket.data['pk'] == PcktKind.ack:
1968                    self.transmit(self.txPacket) #redo
1969                    console.concise("Allowent {0}. Redo Ack with {1} in {2} at {3}\n".format(
1970                             self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
1971                    self.stack.incStat('redo_allow')
1972
1973    def prep(self):
1974        '''
1975        Prepare .txData
1976        '''
1977        self.txData.update( #sh=self.stack.local.ha[0],
1978                            #sp=self.stack.local.ha[1],
1979                            dh=self.remote.ha[0], # maybe needed for index
1980                            dp=self.remote.ha[1], # maybe needed for index
1981                            se=self.remote.nuid,
1982                            de=self.remote.fuid,
1983                            tk=self.kind,
1984                            cf=self.rmt,
1985                            bf=self.bcst,
1986                            si=self.sid,
1987                            ti=self.tid, )
1988
1989    def hello(self):
1990        '''
1991        Process hello packet
1992        '''
1993        if not self.stack.parseInner(self.rxPacket):
1994            return
1995
1996        joins = self.remote.joinInProcess()
1997        if joins:
1998            emsg = ("Allowent {0}. Attempt to allow while join already in process with {1}.  "
1999                                    "Aborting...\n".format(self.stack.name, self.remote.name))
2000            console.concise(emsg)
2001            self.stack.incStat('invalid_allow_attempt')
2002            self.nack(kind=PcktKind.refuse.value)
2003
2004        allows = self.remote.allowInProcess()
2005        for allow in allows:
2006            if allow is self:
2007                emsg = ("Allowent {0}. Duplicate allow hello from {1}. "
2008                        "Dropping...\n".format(self.stack.name, self.remote.name))
2009                console.concise(emsg)
2010                self.stack.incStat('duplicate_allow_attempt')
2011                return
2012
2013            if allow.rmt: # is already a correspondent to an allow
2014                emsg = ("Allowent {0}. Another allowent already in process with {1}. "
2015                        "Aborting...\n".format(self.stack.name, self.remote.name))
2016                console.concise(emsg)
2017                self.stack.incStat('redundant_allow_attempt')
2018                self.nack(kind=PcktKind.refuse.value)
2019                return
2020
2021            else: # already initiator allow in process, resolve race condition
2022                if self.stack.local.name < self.remote.name: # abort correspondent
2023                    emsg = ("Allowent {0}. Already initiated allow with {1}. "
2024                            "Aborting because lesser local name...\n".format(
2025                                self.stack.name, self.remote.name))
2026                    console.concise(emsg)
2027                    self.stack.incStat('redundant_allow_attempt')
2028                    self.nack(kind=PcktKind.refuse.value)
2029                    return
2030
2031                else: # abort initiator, could let otherside nack do this
2032                    emsg = ("Allowent {0}. Removing initiator allow with {1}. "
2033                            "Proceeding because lesser local name...\n".format(
2034                                self.stack.name, self.remote.name))
2035                    console.concise(emsg)
2036                    allow.nack(kind=PcktKind.refuse.value)
2037
2038        self.remote.allowed = None
2039
2040        if not self.remote.joined:
2041            emsg = "Allowent {0}. Must be joined with {1} first\n".format(
2042                self.stack.name, self.remote.name)
2043            console.terse(emsg)
2044            self.stack.incStat('unjoined_allow_attempt')
2045            self.nack(kind=PcktKind.unjoined.value)
2046            return
2047
2048        self.remote.rekey() # refresh short term keys and .allowed
2049        self.add()
2050
2051        data = self.rxPacket.data
2052        body = self.rxPacket.body.data
2053
2054        if not isinstance(body, bytes):
2055            emsg = "Invalid format of hello packet body\n"
2056            console.terse(emsg)
2057            self.stack.incStat('invalid_hello')
2058            #self.remove()
2059            self.nack(kind=PcktKind.reject.value)
2060            return
2061
2062        if len(body) != raeting.HELLO_PACKER.size:
2063            emsg = "Invalid length of hello packet body\n"
2064            console.terse(emsg)
2065            self.stack.incStat('invalid_hello')
2066            #self.remove()
2067            self.nack(kind=PcktKind.reject.value)
2068            return
2069
2070        plain, shortraw, cipher, nonce = raeting.HELLO_PACKER.unpack(body)
2071
2072        self.remote.publee = nacling.Publican(key=shortraw)
2073        msg = self.stack.local.priver.decrypt(cipher, nonce, self.remote.publee.key)
2074        if msg != plain :
2075            emsg = "Invalid plain not match decrypted cipher\n"
2076            console.terse(emsg)
2077            self.stack.incStat('invalid_hello')
2078            #self.remove()
2079            self.nack(kind=PcktKind.reject.value)
2080            return
2081
2082        self.cookie()
2083
2084    def cookie(self):
2085        '''
2086        Send Cookie Packet
2087        '''
2088        oreo = self.stack.local.priver.nonce()
2089        self.oreo = binascii.hexlify(oreo)
2090
2091        stuff = raeting.COOKIESTUFF_PACKER.pack(self.remote.privee.pubraw,
2092                                                self.remote.nuid,
2093                                                self.remote.fuid,
2094                                                oreo)
2095
2096        cipher, nonce = self.stack.local.priver.encrypt(stuff, self.remote.publee.key)
2097        body = raeting.COOKIE_PACKER.pack(cipher, nonce)
2098        packet = packeting.TxPacket(stack=self.stack,
2099                                    kind=PcktKind.cookie.value,
2100                                    embody=body,
2101                                    data=self.txData)
2102        try:
2103            packet.pack()
2104        except raeting.PacketError as ex:
2105            console.terse(str(ex) + '\n')
2106            self.stack.incStat("packing_error")
2107            self.remove()
2108            return
2109        self.transmit(packet)
2110        console.concise("Allowent {0}. Do Cookie with {1} in {2} at {3}\n".format(
2111                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2112
2113    def initiate(self):
2114        '''
2115        Process initiate packet
2116        '''
2117        if not self.stack.parseInner(self.rxPacket):
2118            return
2119        data = self.rxPacket.data
2120        body = self.rxPacket.body.data
2121
2122        if not isinstance(body, bytes):
2123            emsg = "Invalid format of initiate packet body\n"
2124            console.terse(emsg)
2125            self.stack.incStat('invalid_initiate')
2126            #self.remove()
2127            self.nack(kind=PcktKind.reject.value)
2128            return
2129
2130        if len(body) != raeting.INITIATE_PACKER.size:
2131            emsg = "Invalid length of initiate packet body\n"
2132            console.terse(emsg)
2133            self.stack.incStat('invalid_initiate')
2134            #self.remove()
2135            self.nack(kind=PcktKind.reject.value)
2136            return
2137
2138        shortraw, oreo, cipher, nonce = raeting.INITIATE_PACKER.unpack(body)
2139
2140        if shortraw != self.remote.publee.keyraw:
2141            emsg = "Mismatch of short term public key in initiate packet\n"
2142            console.terse(emsg)
2143            self.stack.incStat('invalid_initiate')
2144            #self.remove()
2145            self.nack(kind=PcktKind.reject.value)
2146            return
2147
2148        if (binascii.hexlify(oreo) != self.oreo):
2149            emsg = "Stale or invalid cookie in initiate packet\n"
2150            console.terse(emsg)
2151            self.stack.incStat('invalid_initiate')
2152            #self.remove()
2153            self.nack(kind=PcktKind.reject.value)
2154            return
2155
2156        msg = self.remote.privee.decrypt(cipher, nonce, self.remote.publee.key)
2157        if len(msg) != raeting.INITIATESTUFF_PACKER.size:
2158            emsg = "Invalid length of initiate stuff\n"
2159            console.terse(emsg)
2160            self.stack.incStat('invalid_initiate')
2161            #self.remove()
2162            self.nack(kind=PcktKind.reject.value)
2163            return
2164
2165        pubraw, vcipher, vnonce, fqdn = raeting.INITIATESTUFF_PACKER.unpack(msg)
2166        if pubraw != self.remote.pubber.keyraw:
2167            emsg = "Mismatch of long term public key in initiate stuff\n"
2168            console.terse(emsg)
2169            self.stack.incStat('invalid_initiate')
2170            #self.remove()
2171            self.nack(kind=PcktKind.reject.value)
2172            return
2173
2174        fqdn = fqdn.rstrip(b' ')
2175        lfqdn = self.stack.local.fqdn
2176        if isinstance(lfqdn, unicode):
2177            lfqdn = lfqdn.encode('ascii', 'ignore')
2178        lfqdn = lfqdn.ljust(128, b' ')[:128].rstrip(b' ')
2179        if fqdn != lfqdn:
2180            emsg = ("Mismatch of local fqdn {0} with rxed fqdn {1} in initiate "
2181                    "stuff\n".format(lfqdn, fqdn))
2182            console.terse(emsg)
2183            #self.stack.incStat('invalid_initiate')
2184            #self.remove()
2185            #self.nack(kind=raeting.pcktKinds.reject)
2186            #return
2187
2188        vouch = self.stack.local.priver.decrypt(vcipher, vnonce, self.remote.pubber.key)
2189        if vouch != self.remote.publee.keyraw or vouch != shortraw:
2190            emsg = "Short term key vouch failed\n"
2191            console.terse(emsg)
2192            self.stack.incStat('invalid_initiate')
2193            #self.remove()
2194            self.nack(kind=PcktKind.reject.value)
2195            return
2196
2197        self.ackInitiate()
2198
2199    def ackInitiate(self):
2200        '''
2201        Send ack to initiate request
2202        '''
2203
2204        body = b''
2205        packet = packeting.TxPacket(stack=self.stack,
2206                                    kind=PcktKind.ack.value,
2207                                    embody=body,
2208                                    data=self.txData)
2209        try:
2210            packet.pack()
2211        except raeting.PacketError as ex:
2212            console.terse(str(ex) + '\n')
2213            self.stack.incStat("packing_error")
2214            self.remove()
2215            return
2216
2217        self.transmit(packet)
2218        console.concise("Allowent {0}. Do Ack Initiate with {1} in {2} at {3}\n".format(
2219                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2220
2221        self.allow()
2222
2223    def allow(self):
2224        '''
2225        Perform allowment
2226        '''
2227        self.remote.allowed = True
2228        self.remote.alived = True  # Fast alived as soon as allowed
2229        self.remote.nextSid() # start new session always on successful allow
2230        self.remote.replaceStaleInitiators()
2231        self.stack.dumpRemote(self.remote)
2232
2233    def final(self):
2234        '''
2235        Process ackFinal packet
2236        So that both sides are waiting on acks at the end so does not restart
2237        transaction if ack initiate is dropped
2238        '''
2239        if not self.stack.parseInner(self.rxPacket):
2240            return
2241
2242        self.remove()
2243        console.concise("Allowent {0}. Done with {1} in {2} at {3}\n".format(
2244                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2245        self.stack.incStat("allow_correspond_complete")
2246        self.remote.sendSavedMessages() # could include messages saved on rejoin
2247
2248    def refuse(self):
2249        '''
2250        Process nack refuse packet
2251        '''
2252        if not self.stack.parseInner(self.rxPacket):
2253            return
2254
2255        self.remove()
2256        console.concise("Allowent {0}. Refused by {1} in {2} at {3}n".format(
2257                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2258        self.stack.incStat(self.statKey())
2259
2260    def reject(self):
2261        '''
2262        Process nack packet
2263        terminate in response to nack
2264        '''
2265        if not self.stack.parseInner(self.rxPacket):
2266            return
2267
2268        self.remote.allowed = False
2269        self.remove()
2270        console.concise("Allowent {0}. Rejected by {1} in {2} at {3}\n".format(
2271                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2272        self.stack.incStat(self.statKey())
2273
2274    def nack(self, kind=PcktKind.nack.value):
2275        '''
2276        Send nack to terminate allow transaction
2277        '''
2278        body = b''
2279        packet = packeting.TxPacket(stack=self.stack,
2280                                    kind=kind,
2281                                    embody=body,
2282                                    data=self.txData)
2283        try:
2284            packet.pack()
2285        except raeting.PacketError as ex:
2286            console.terse(str(ex) + '\n')
2287            self.stack.incStat("packing_error")
2288            self.remove()
2289            return
2290
2291        if kind==PcktKind.refuse:
2292            console.terse("Allowent {0}. Do Nack Refuse of {1} in {2} at {3}\n".format(
2293                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2294        elif kind==PcktKind.reject:
2295            console.concise("Allowent {0}. Do Nack Reject {1} in {2} at {3}\n".format(
2296                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2297        elif kind==PcktKind.unjoined:
2298            console.concise("Allowent {0}. Do Nack Unjoined {1} in {2} at {3}\n".format(
2299                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2300        elif kind == PcktKind.nack:
2301            console.terse("Allowent {0}. Do Nack of {1} in {2} at {3}\n".format(
2302                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2303        else:
2304            console.terse("Allowent {0}. Invalid nack kind {1}. Do Nack of {2} anyway "
2305                    " in {3} at {4}\n".format(self.stack.name,
2306                                       kind,
2307                                       self.remote.name,
2308                                       self.tid,
2309                                       self.stack.store.stamp))
2310            kind == PcktKind.nack
2311
2312        self.remove()
2313        self.transmit(packet)
2314        self.stack.incStat(self.statKey())
2315
2316class Aliver(Initiator):
2317    '''
2318    RAET protocol Aliver Initiator class Dual of Alivent
2319    Sends keep alive heatbeat messages to detect presence
2320
2321
2322    update alived status of .remote
2323    only use .remote.refresh to update
2324
2325    '''
2326    Timeout = 2.0
2327    RedoTimeoutMin = 0.25 # initial timeout
2328    RedoTimeoutMax = 1.0 # max timeout
2329
2330    def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None,
2331                cascade=False, **kwa):
2332        '''
2333        Setup instance
2334        '''
2335        kwa['kind'] = TrnsKind.alive.value
2336        super(Aliver, self).__init__(**kwa)
2337
2338        self.cascade = cascade
2339
2340        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
2341        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
2342        self.redoTimer = StoreTimer(self.stack.store,
2343                                           duration=self.redoTimeoutMin)
2344
2345        self.sid = self.remote.sid
2346        self.tid = self.remote.nextTid()
2347        self.prep() # prepare .txData
2348
2349    def transmit(self, packet):
2350        '''
2351        Augment transmit with restart of redo timer
2352        '''
2353        super(Aliver, self).transmit(packet)
2354        self.redoTimer.restart()
2355
2356    def receive(self, packet):
2357        """
2358        Process received packet belonging to this transaction
2359        """
2360        super(Aliver, self).receive(packet)
2361
2362        if packet.data['tk'] == TrnsKind.alive:
2363            if packet.data['pk'] == PcktKind.ack:
2364                self.complete()
2365            elif packet.data['pk'] == PcktKind.nack: # refused
2366                self.refuse()
2367            elif packet.data['pk'] == PcktKind.refuse: # refused
2368                self.refuse()
2369            elif packet.data['pk'] == PcktKind.unjoined: # unjoin
2370                self.unjoin()
2371            elif packet.data['pk'] == PcktKind.unallowed: # unallow
2372                self.unallow()
2373            elif packet.data['pk'] == PcktKind.reject: # rejected
2374                self.reject()
2375
2376    def process(self):
2377        '''
2378        Perform time based processing of transaction
2379        '''
2380        if self.timeout > 0.0 and self.timer.expired:
2381            console.concise("Aliver {0}. Timed out with {1} in {2} at {3}\n".format(
2382                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2383            self.remove()
2384            self.remote.refresh(alived=False) # mark as dead
2385            return
2386
2387        # need keep sending message until completed or timed out
2388        if self.redoTimer.expired:
2389            duration = min(
2390                         max(self.redoTimeoutMin,
2391                              self.redoTimer.duration * 2.0),
2392                         self.redoTimeoutMax)
2393            self.redoTimer.restart(duration=duration)
2394            if self.txPacket:
2395                if self.txPacket.data['pk'] == PcktKind.request:
2396                    self.transmit(self.txPacket) # redo
2397                    console.concise("Aliver {0}. Redo with {1} in {2} at {3}\n".format(
2398                        self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2399                    self.stack.incStat('redo_alive')
2400
2401    def prep(self):
2402        '''
2403        Prepare .txData
2404        '''
2405        self.txData.update( #sh=self.stack.local.ha[0],
2406                            #sp=self.stack.local.ha[1],
2407                            dh=self.remote.ha[0], # maybe needed for index
2408                            dp=self.remote.ha[1], # maybe needed for index
2409                            se=self.remote.nuid,
2410                            de=self.remote.fuid,
2411                            tk=self.kind,
2412                            cf=self.rmt,
2413                            bf=self.bcst,
2414                            si=self.sid,
2415                            ti=self.tid,)
2416
2417    def alive(self, body=None):
2418        '''
2419        Send message
2420        '''
2421        if not self.remote.joined:
2422            emsg = "Aliver {0}. Must be joined with {1} first\n".format(
2423                    self.stack.name, self.remote.name)
2424            console.terse(emsg)
2425            self.stack.incStat('unjoined_remote')
2426            self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
2427            return
2428
2429        if not self.remote.allowed:
2430            emsg = "Aliver {0}. Must be allowed with {1} first\n".format(
2431                    self.stack.name, self.remote.name)
2432            console.terse(emsg)
2433            self.stack.incStat('unallowed_remote')
2434            self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
2435            return
2436
2437        self.remote.refresh(alived=None) #Restart timer but do not change alived status
2438        self.add()
2439
2440        body = odict()
2441        packet = packeting.TxPacket(stack=self.stack,
2442                                    kind=PcktKind.request.value,
2443                                    embody=body,
2444                                    data=self.txData)
2445        try:
2446            packet.pack()
2447        except raeting.PacketError as ex:
2448            console.terse(str(ex) + '\n')
2449            self.stack.incStat("packing_error")
2450            self.remove()
2451            return
2452        self.transmit(packet)
2453        console.concise("Aliver {0}. Do Alive with {1} in {2} at {3}\n".format(
2454                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2455
2456    def complete(self):
2457        '''
2458        Process ack packet. Complete transaction and remove
2459        '''
2460        if not self.stack.parseInner(self.rxPacket):
2461            return
2462        self.remote.refresh(alived=True) # restart timer mark as alive
2463        self.remove()
2464        console.concise("Aliver {0}. Done with {1} in {2} at {3}\n".format(
2465                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2466        self.stack.incStat("alive_complete")
2467
2468    def refuse(self):
2469        '''
2470        Process nack refuse packet
2471        terminate in response to nack
2472        '''
2473        if not self.stack.parseInner(self.rxPacket):
2474            return
2475        self.remote.refresh(alived=None) # restart timer do not change status
2476        self.remove()
2477        console.concise("Aliver {0}. Refused by {1} in {2} at {3}\n".format(
2478                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2479        self.stack.incStat(self.statKey())
2480
2481    def reject(self):
2482        '''
2483        Process nack reject packet
2484        terminate in response to nack
2485        '''
2486        if not self.stack.parseInner(self.rxPacket):
2487            return
2488        self.remote.refresh(alived=False) # restart timer set status to False
2489        self.remove()
2490        console.concise("Aliver {0}. Rejected by {1} in {2} at {3}\n".format(
2491                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2492        self.stack.incStat(self.statKey())
2493
2494    def unjoin(self):
2495        '''
2496        Process unjoin packet
2497        terminate in response to unjoin
2498        '''
2499        if not self.stack.parseInner(self.rxPacket):
2500            return
2501        self.remote.refresh(alived=None) # restart timer do not change status
2502        self.remote.joined = False
2503        self.remove()
2504        console.concise("Aliver {0}. Refused unjoin by {1} in {2} at {3}\n".format(
2505                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2506        self.stack.incStat(self.statKey())
2507        self.stack.join(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
2508
2509    def unallow(self):
2510        '''
2511        Process unallow nack packet
2512        terminate in response to unallow
2513        '''
2514        if not self.stack.parseInner(self.rxPacket):
2515            return
2516        self.remote.refresh(alived=None) # restart timer do not change status
2517        self.remote.allowed = False
2518        self.remove()
2519        console.concise("Aliver {0}. Refused unallow by {1} in {2} at {3}\n".format(
2520                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2521        self.stack.incStat(self.statKey())
2522        self.stack.allow(uid=self.remote.uid, cascade=self.cascade, timeout=self.timeout)
2523
2524class Alivent(Correspondent):
2525    '''
2526    RAET protocol Alivent Correspondent class Dual of Aliver
2527    Keep alive heartbeat
2528    '''
2529    Timeout = 10.0
2530
2531    def __init__(self, **kwa):
2532        '''
2533        Setup instance
2534        '''
2535        kwa['kind'] = TrnsKind.alive.value
2536        super(Alivent, self).__init__(**kwa)
2537
2538        self.prep() # prepare .txData
2539
2540    def receive(self, packet):
2541        """
2542        Process received packet belonging to this transaction
2543        """
2544        super(Alivent, self).receive(packet)
2545
2546        if packet.data['tk'] == TrnsKind.alive:
2547            if packet.data['pk'] == PcktKind.request:
2548                self.alive()
2549
2550    def process(self):
2551        '''
2552        Perform time based processing of transaction
2553
2554        '''
2555        if self.timeout > 0.0 and self.timer.expired:
2556            self.nack() #manage restarts alive later
2557            console.concise("Alivent {0}. Timed out with {1} in {2} at {3}\n".format(
2558                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2559            return
2560
2561    def prep(self):
2562        '''
2563        Prepare .txData
2564        '''
2565        self.txData.update( #sh=self.stack.local.ha[0],
2566                            #sp=self.stack.local.ha[1],
2567                            dh=self.remote.ha[0], # maybe needed for index
2568                            dp=self.remote.ha[1], # maybe needed for index
2569                            se=self.remote.nuid,
2570                            de=self.remote.fuid,
2571                            tk=self.kind,
2572                            cf=self.rmt,
2573                            bf=self.bcst,
2574                            si=self.sid,
2575                            ti=self.tid,)
2576
2577    def alive(self):
2578        '''
2579        Process alive packet
2580        '''
2581        if not self.stack.parseInner(self.rxPacket):
2582            return
2583
2584        if not self.remote.joined:
2585            self.remote.refresh(alived=None) # received signed packet so its alive
2586            emsg = "Alivent {0}. Must be joined with {1} first\n".format(
2587                    self.stack.name, self.remote.name)
2588            console.terse(emsg)
2589            self.stack.incStat('unjoined_alive_attempt')
2590            self.nack(kind=PcktKind.unjoined.value)
2591            return
2592
2593        if not self.remote.allowed:
2594            self.remote.refresh(alived=None) # received signed packet so its alive
2595            emsg = "Alivent {0}. Must be allowed with {1} first\n".format(
2596                    self.stack.name, self.remote.name)
2597            console.terse(emsg)
2598            self.stack.incStat('unallowed_alive_attempt')
2599            self.nack(kind=PcktKind.unallowed.value)
2600            return
2601
2602        self.add()
2603
2604        data = self.rxPacket.data
2605        body = self.rxPacket.body.data
2606
2607        body = odict()
2608        packet = packeting.TxPacket(stack=self.stack,
2609                                    kind=PcktKind.ack.value,
2610                                    embody=body,
2611                                    data=self.txData)
2612        try:
2613            packet.pack()
2614        except raeting.PacketError as ex:
2615            console.terse(str(ex) + '\n')
2616            self.stack.incStat("packing_error")
2617            self.remove()
2618            return
2619
2620        self.transmit(packet)
2621        console.concise("Alivent {0}. Do ack alive with {1} in {2} at {3}\n".format(
2622                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2623        self.remote.refresh(alived=True)
2624        self.remove()
2625        console.concise("Alivent {0}. Done with {1} in {2} at {3}\n".format(
2626                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2627        self.stack.incStat("alive_complete")
2628
2629    def nack(self, kind=PcktKind.nack.value):
2630        '''
2631        Send nack to terminate alive transaction
2632        '''
2633        body = odict()
2634        packet = packeting.TxPacket(stack=self.stack,
2635                                    kind=kind,
2636                                    embody=body,
2637                                    data=self.txData)
2638        try:
2639            packet.pack()
2640        except raeting.PacketError as ex:
2641            console.terse(str(ex) + '\n')
2642            self.stack.incStat("packing_error")
2643            self.remove()
2644            return
2645
2646        if kind == PcktKind.refuse:
2647                console.terse("Alivent {0}. Do Refuse of {1} in {2} at {3}\n".format(
2648                        self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2649        elif kind == PcktKind.unjoined:
2650                console.terse("Alivent {0}. Do Unjoined of {1} in {2} at {3}\n".format(
2651                        self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2652        elif kind == PcktKind.unallowed:
2653                console.terse("Alivent {0}. Do Unallowed of {1} in {2} at {3}\n".format(
2654                        self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2655        elif kind == PcktKind.reject:
2656            console.concise("Alivent {0}. Do Reject {1} in {2} at {3}\n".format(
2657                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2658        elif kind == PcktKind.nack:
2659            console.terse("Alivent {0}. Do Nack of {1} in {2} at {3}\n".format(
2660                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2661        else:
2662            console.terse("Alivent {0}. Invalid nack kind {1}. Do Nack of {2} anyway "
2663                    " in {3} at {4}\n".format(self.stack.name,
2664                                       kind,
2665                                       self.remote.name,
2666                                       self.tid,
2667                                       self.stack.store.stamp))
2668            kind == PcktKind.nack
2669
2670        self.transmit(packet)
2671        self.remove()
2672
2673        self.stack.incStat(self.statKey())
2674
2675class Messenger(Initiator):
2676    '''
2677    RAET protocol Messenger Initiator class Dual of Messengent
2678    Generic messages
2679    '''
2680    Timeout = 0.0
2681    RedoTimeoutMin = 0.2 # initial timeout
2682    RedoTimeoutMax = 0.5 # max timeout
2683
2684    def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, burst=0, **kwa):
2685        '''
2686        Setup instance
2687        '''
2688        kwa['kind'] = TrnsKind.message.value
2689        super(Messenger, self).__init__(**kwa)
2690
2691        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
2692        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
2693        self.redoTimer = StoreTimer(self.stack.store,
2694                                           duration=self.redoTimeoutMin)
2695
2696        self.burst = max(0, int(burst)) # BurstSize
2697        self.misseds = oset()  # ordered set of currently missed segments
2698        self.acked = False  # Have received at least one ack
2699
2700        self.sid = self.remote.sid
2701        self.tid = self.remote.nextTid()
2702        self.prep() # prepare .txData
2703        self.tray = packeting.TxTray(stack=self.stack)
2704
2705    def transmit(self, packet):
2706        '''
2707        Augment transmit with restart of redo timer
2708        '''
2709        super(Messenger, self).transmit(packet)
2710        self.redoTimer.restart()
2711
2712    def receive(self, packet):
2713        """
2714        Process received packet belonging to this transaction
2715        """
2716        super(Messenger, self).receive(packet)
2717
2718        if packet.data['tk'] == TrnsKind.message:
2719            if packet.data['pk'] == PcktKind.ack: # more
2720                self.acked = True
2721                self.another()  # continue message
2722            elif packet.data['pk'] == PcktKind.resend:  # resend
2723                self.acked = True
2724                self.resend()  # resend missed segments
2725            elif packet.data['pk'] == PcktKind.done:  # completed
2726                self.acked = True
2727                self.complete()
2728            elif packet.data['pk'] == PcktKind.nack: # rejected
2729                self.reject()
2730
2731    def process(self):
2732        '''
2733        Perform time based processing of transaction
2734        '''
2735        if self.timeout > 0.0 and self.timer.expired:
2736            self.remove()
2737            console.concise("Messenger {0}. Timed out with {1} in {2} at {3}\n".format(
2738                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2739            return
2740
2741        # keep sending message  until completed or timed out
2742        if self.redoTimer.expired:
2743            duration = min(
2744                         max(self.redoTimeoutMin,
2745                              self.redoTimer.duration * 2.0),
2746                         self.redoTimeoutMax)
2747            self.redoTimer.restart(duration=duration)
2748            if self.txPacket:
2749                if self.txPacket.data['pk'] in [PcktKind.message]:
2750                    if self.acked and not self.txPacket.data['af']:  # turn on AgnFlag if not set
2751                        self.txPacket.data.update(af=True)
2752                        self.txPacket.repack()
2753                    self.transmit(self.txPacket) # redo
2754                    console.concise("Messenger {0}. Redo Segment {1} with "
2755                                    "{2} in {3} at {4}\n".format(
2756                                    self.stack.name,
2757                                    self.txPacket.data['sn'],
2758                                    self.remote.name,
2759                                    self.tid,
2760                                    self.stack.store.stamp))
2761                    self.stack.incStat('redo_segment')
2762
2763    def prep(self):
2764        '''
2765        Prepare .txData
2766        '''
2767        self.txData.update( #sh=self.stack.local.ha[0],
2768                            #sp=self.stack.local.ha[1],
2769                            dh=self.remote.ha[0], # maybe needed for index
2770                            dp=self.remote.ha[1], # maybe needed for index
2771                            se=self.remote.nuid,
2772                            de=self.remote.fuid,
2773                            tk=self.kind,
2774                            cf=self.rmt,
2775                            bf=self.bcst,
2776                            si=self.sid,
2777                            ti=self.tid,)
2778
2779    def message(self, body=None):
2780        '''
2781        Send message or part of message. So repeatedly called until complete
2782        '''
2783
2784        if not self.remote.allowed:
2785            emsg = "Messenger {0}. Must be allowed with {1} first\n".format(
2786                    self.stack.name, self.remote.name)
2787            console.terse(emsg)
2788            self.stack.incStat('unallowed_remote')
2789            self.remove()
2790            return
2791
2792        if not self.tray.packets:
2793            try:
2794                self.tray.pack(data=self.txData, body=body)
2795            except raeting.PacketError as ex:
2796                console.terse(str(ex) + '\n')
2797                self.stack.incStat("packing_error")
2798                self.remove()
2799                return
2800
2801        if self.tray.current >= len(self.tray.packets):
2802            emsg = "Messenger {0}. Current packet {1} greater than num packets {2}\n".format(
2803                                self.stack.name, self.tray.current, len(self.tray.packets))
2804            console.terse(emsg)
2805            self.remove()
2806            return
2807
2808        if self.index not in self.remote.transactions:
2809            self.add()
2810        elif self.remote.transactions[self.index] != self:
2811            emsg = "Messenger {0}. Remote {1} Index collision of {2} in {3} at {4}\n".format(
2812                                self.stack.name,
2813                                self.remote.name,
2814                                self.index,
2815                                self.tid,
2816                                self.stack.store.stamp)
2817            console.terse(emsg)
2818            self.incStat('message_index_collision')
2819            self.remove()
2820            return
2821
2822        burst = (min(self.burst, (len(self.tray.packets) - self.tray.current))
2823                    if self.burst else (len(self.tray.packets) - self.tray.current))
2824
2825        packets = self.tray.packets[self.tray.current:self.tray.current + burst]
2826        if packets:
2827            last = packets[-1]
2828            last.data.update(wf=True)  # set wait flag on last packet in burst
2829            last.repack()
2830
2831        for packet in packets:
2832            self.transmit(packet)
2833            self.tray.last = self.tray.current
2834            self.tray.current += 1
2835            self.stack.incStat("message_segment_tx")
2836            console.concise("Messenger {0}. Do Message Segment {1} with {2} in {3} at {4}\n".format(
2837                    self.stack.name, self.tray.last, self.remote.name, self.tid, self.stack.store.stamp))
2838
2839    def another(self):
2840        '''
2841        Process ack packet and continue sending
2842        '''
2843        if not self.stack.parseInner(self.rxPacket):
2844            return
2845        self.remote.refresh(alived=True)
2846        self.stack.incStat("message_ack_rx")
2847
2848        if self.misseds:
2849            self.sendMisseds()
2850        else:
2851            current = self.rxPacket.data['sn'] + 1
2852            if self.tray.current > current:
2853                console.concise("Messenger {0}. Current {1} is ahead of requested {2}. Adjust.\n".format(
2854                    self.stack.name, self.tray.current, current))
2855                self.tray.current = current
2856                self.tray.last = current - 1
2857            if self.tray.current < len(self.tray.packets):
2858                self.message()  # continue message
2859
2860    def resend(self):
2861        '''
2862        Process resend packet and update .misseds list of missing packets
2863        Then send misseds
2864        '''
2865        if not self.stack.parseInner(self.rxPacket):
2866            return
2867
2868        self.remote.refresh(alived=True)
2869        self.stack.incStat('message_resend_rx')
2870
2871        data = self.rxPacket.data
2872        body = self.rxPacket.body.data
2873
2874        misseds = body.get('misseds')  # indexes of missed segments
2875        if misseds:
2876            if not self.tray.packets:
2877                emsg = "Invalid resend request '{0}'\n".format(misseds)
2878                console.terse(emsg)
2879                self.stack.incStat('invalid_resend')
2880                return
2881
2882            for m in misseds:
2883                try:
2884                    packet = self.tray.packets[m]
2885                except IndexError as ex:
2886                    #console.terse(str(ex) + '\n')
2887                    console.terse("Invalid misseds segment number {0}\n".format(m))
2888                    self.stack.incStat("invalid_misseds")
2889                    return
2890                self.misseds.add(packet)  # add segment, set only adds if unique
2891            self.sendMisseds()
2892
2893    def sendMisseds(self):
2894        '''
2895        Send a burst of missed packets
2896        '''
2897        if self.misseds:
2898            burst = (min(self.burst, (len(self.misseds))) if
2899                     self.burst else len(self.misseds))
2900            # make list of first burst number of packets
2901            misseds = [missed for missed in self.misseds][:burst]
2902            for packet in misseds[:-1]:
2903                repack = False
2904                if not packet.data['af']:  # turn on again flag if not set
2905                    packet.data.update(af=True)
2906                    repack = True
2907                if packet.data['wf']:  # turn off wait flag if set
2908                    packet.data.update(wf=False)
2909                    repack = True
2910                if repack:
2911                    packet.repack()
2912            for packet in misseds[-1:]:  # last packet
2913                repack = False
2914                if not packet.data['af']:  # turn on again flag is not set
2915                    packet.data.update(af=True)
2916                    repack = True
2917                if not packet.data['wf']:  # turn on wait flag if not set
2918                    packet.data.update(wf=True)
2919                    repack = True
2920                if repack:
2921                    packet.repack()
2922
2923            for packet in misseds:
2924                self.transmit(packet)
2925                self.stack.incStat("message_segment_tx")
2926                console.concise("Messenger {0}. Do Resend Message Segment "
2927                                "{1} with {2} in {3} at {4}\n".format(
2928                    self.stack.name,
2929                    packet.data['sn'],
2930                    self.remote.name,
2931                    self.tid,
2932                    self.stack.store.stamp))
2933                self.misseds.discard(packet)  # remove from self.misseds
2934
2935    def complete(self):
2936        '''
2937        Process Done Ack
2938        Complete transaction and remove
2939        '''
2940        if not self.stack.parseInner(self.rxPacket):
2941            return
2942
2943        self.remote.refresh(alived=True)
2944        self.stack.incStat('message_complete_rx')
2945
2946        self.remove()
2947        console.concise("Messenger {0}. Done with {1} in {2} at {3}\n".format(
2948                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2949        self.stack.incStat("message_initiate_complete")
2950
2951    def reject(self):
2952        '''
2953        Process nack packet
2954        terminate in response to nack
2955        '''
2956        if not self.stack.parseInner(self.rxPacket):
2957            return
2958
2959        self.remote.refresh(alived=True)
2960        self.stack.incStat('message_reject_rx')
2961
2962        self.remove()
2963        console.concise("Messenger {0}. Rejected by {1} in {2} at {3}\n".format(
2964                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2965        self.stack.incStat(self.statKey())
2966
2967    def nack(self):
2968        '''
2969        Send nack to terminate transaction
2970        '''
2971        body = odict()
2972        packet = packeting.TxPacket(stack=self.stack,
2973                                    kind=PcktKind.nack.value,
2974                                    embody=body,
2975                                    data=self.txData)
2976        try:
2977            packet.pack()
2978        except raeting.PacketError as ex:
2979            console.terse(str(ex) + '\n')
2980            self.stack.incStat("packing_error")
2981            self.remove()
2982            return
2983
2984        self.transmit(packet)
2985        self.stack.incStat('message_nack_tx')
2986        self.remove()
2987        console.concise("Messenger {0}. Do Nack Reject of {1} in {2} at {3}\n".format(
2988                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
2989        self.stack.incStat(self.statKey())
2990
2991class Messengent(Correspondent):
2992    '''
2993    RAET protocol Messengent Correspondent class Dual of Messenger
2994    Generic Messages
2995    '''
2996    Timeout = 0.0
2997    RedoTimeoutMin = 0.2 # initial timeout
2998    RedoTimeoutMax = 0.5 # max timeout
2999
3000    def __init__(self, redoTimeoutMin=None, redoTimeoutMax=None, **kwa):
3001        '''
3002        Setup instance
3003        '''
3004        kwa['kind'] = TrnsKind.message.value
3005        super(Messengent, self).__init__(**kwa)
3006
3007        self.redoTimeoutMax = redoTimeoutMax or self.RedoTimeoutMax
3008        self.redoTimeoutMin = redoTimeoutMin or self.RedoTimeoutMin
3009        self.redoTimer = StoreTimer(self.stack.store,
3010                                           duration=self.redoTimeoutMin)
3011
3012        self.wait = False  # wf wait flag
3013        self.lowest = None
3014        self.prep() # prepare .txData
3015        self.tray = packeting.RxTray(stack=self.stack)
3016
3017    def transmit(self, packet):
3018        '''
3019        Augment transmit with restart of redo timer
3020        '''
3021        super(Messengent, self).transmit(packet)
3022        self.redoTimer.restart()
3023
3024    def receive(self, packet):
3025        """
3026        Process received packet belonging to this transaction
3027        """
3028        super(Messengent, self).receive(packet)
3029
3030        # resent message
3031        if packet.data['tk'] == TrnsKind.message:
3032            if packet.data['pk'] == PcktKind.message:
3033                self.message()
3034            elif packet.data['pk'] == PcktKind.nack: # rejected
3035                self.reject()
3036
3037    def process(self):
3038        '''
3039        Perform time based processing of transaction
3040
3041        '''
3042        if self.timeout > 0.0 and self.timer.expired:
3043            self.nack()
3044            console.concise("Messengent {0}. Timed out with {1} in {2} at {3}\n".format(
3045                    self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
3046            return
3047
3048        if self.redoTimer.expired:
3049            duration = min(
3050                         max(self.redoTimeoutMin,
3051                              self.redoTimer.duration * 2.0),
3052                         self.redoTimeoutMax)
3053            self.redoTimer.restart(duration=duration)
3054
3055            if self.tray.complete:
3056                self.complete()
3057            else:
3058                misseds = self.tray.missing(begin=self.lowest)
3059                if misseds:  # resent missed segments
3060                    self.lowest = misseds[0]
3061                    self.resend(misseds)
3062                else:  # always ask for more here
3063                    self.ack()
3064
3065    def prep(self):
3066        '''
3067        Prepare .txData
3068        '''
3069        self.txData.update( #sh=self.stack.local.ha[0],
3070                            #sp=self.stack.local.ha[1],
3071                            dh=self.remote.ha[0], # maybe needed for index
3072                            dp=self.remote.ha[1], # maybe needed for index
3073                            se=self.remote.nuid,
3074                            de=self.remote.fuid,
3075                            tk=self.kind,
3076                            cf=self.rmt,
3077                            bf=self.bcst,
3078                            wf=self.rxPacket.data['wf'],  # was self.wait
3079                            si=self.sid,
3080                            ti=self.tid,
3081                            ck=self.rxPacket.data['ck'],  # so acks use same coat kind encrypted
3082                            fk=self.rxPacket.data['fk'],  # so acks use same foot kind signed
3083                          )
3084
3085    def message(self):
3086        '''
3087        Process message packet. Called repeatedly for each packet in message
3088        '''
3089        if not self.remote.allowed:
3090            emsg = "Messengent {0}. Must be allowed with {1} first\n".format(
3091                    self.stack.name,  self.remote.name)
3092            console.terse(emsg)
3093            self.stack.incStat('unallowed_message_attempt')
3094            self.nack()
3095            return
3096
3097        try:
3098            body = self.tray.parse(self.rxPacket)
3099        except raeting.PacketError as ex:
3100            console.terse(str(ex) + '\n')
3101            self.incStat('parsing_message_error')
3102            self.nack()
3103            return
3104
3105        if self.index not in self.remote.transactions:
3106            self.add()
3107        elif self.remote.transactions[self.index] != self:
3108            emsg = "Messengent {0}. Remote {1} Index collision of {2} in {3} at {4}\n".format(
3109                                self.stack.name,
3110                                self.remote.name,
3111                                self.index,
3112                                self.tid,
3113                                self.stack.store.stamp)
3114            console.terse(emsg)
3115            self.incStat('message_index_collision')
3116            self.nack()
3117            return
3118
3119        self.remote.refresh(alived=True)
3120        self.stack.incStat("message_segment_rx")
3121
3122        self.wait = self.rxPacket.data['wf']  # sender is waiting for ack
3123
3124        if self.tray.complete:
3125            self.complete()
3126        elif self.wait:  # ask for more if sender waiting for ack
3127            misseds = self.tray.missing(begin=self.lowest)
3128            if misseds:  # resent missed segments
3129                self.lowest = misseds[0]
3130                self.resend(misseds)
3131            else:
3132                self.ack()
3133
3134    def ack(self):
3135        '''
3136        Send ack to message
3137        '''
3138        body = odict()
3139        packet = packeting.TxPacket(stack=self.stack,
3140                                    kind=PcktKind.ack.value,
3141                                    embody=body,
3142                                    data=self.txData)
3143        packet.data['sn'] = self.tray.highest
3144        try:
3145            packet.pack()
3146        except raeting.PacketError as ex:
3147            console.terse(str(ex) + '\n')
3148            self.stack.incStat("packing_error")
3149            self.remove()
3150            return
3151        self.transmit(packet)
3152        self.stack.incStat("message_more_ack")
3153        console.concise("Messengent {0}. Do Ack More from {1} on Segment {2} with {3} in {4} at {5}\n".format(
3154            self.stack.name,
3155            self.tray.highest + 1,
3156            self.rxPacket.data['sn'],
3157            self.remote.name,
3158            self.tid,
3159            self.stack.store.stamp))
3160
3161    def resend(self, misseds):
3162        '''
3163        Send resend request(s) for missing packets
3164        '''
3165        while misseds:
3166            if len(misseds) > 64:
3167                remainders = misseds[64:] # only do at most 64 at a time
3168                misseds = misseds[:64]
3169            else:
3170                remainders = []
3171
3172            body = odict(misseds=misseds)
3173            packet = packeting.TxPacket(stack=self.stack,
3174                                        kind=PcktKind.resend.value,
3175                                        embody=body,
3176                                        data=self.txData)
3177            try:
3178                packet.pack()
3179            except raeting.PacketError as ex:
3180                console.terse(str(ex) + '\n')
3181                self.stack.incStat("packing_error")
3182                self.remove()
3183                return
3184            self.transmit(packet)
3185            self.stack.incStat("message_resend_tx")
3186            console.concise("Messengent {0}. Do Resend Segments {1} with {2} in {3} at {4}\n".format(
3187                    self.stack.name,
3188                    misseds,
3189                    self.remote.name,
3190                    self.tid,
3191                    self.stack.store.stamp))
3192            misseds = remainders
3193
3194    def complete(self):
3195        '''
3196        Complete transaction and remove
3197        '''
3198        self.done()
3199        console.verbose("{0} received message body\n{1}\n".format(
3200            self.stack.name, self.tray.body))
3201        # application layer authorizaiton needs to know who sent the message
3202        self.stack.rxMsgs.append((self.tray.body, self.remote.name))
3203        self.remove()
3204        console.concise("Messengent {0}. Complete with {1} in {2} at {3}\n".format(
3205                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
3206        self.stack.incStat("messengent_correspond_complete")
3207
3208    def done(self):
3209        '''
3210        Send done ack to complete message
3211        '''
3212        body = odict()
3213        packet = packeting.TxPacket(stack=self.stack,
3214                                    kind=PcktKind.done.value,
3215                                    embody=body,
3216                                    data=self.txData)
3217        try:
3218            packet.pack()
3219        except raeting.PacketError as ex:
3220            console.terse(str(ex) + '\n')
3221            self.stack.incStat("packing_error")
3222            self.remove()
3223            return
3224        self.transmit(packet)
3225        self.stack.incStat("message_complete_ack")
3226        console.concise("Messengent {0}. Do Ack Done Message on Segment {1} with {2} in {3} at {4}\n".format(
3227            self.stack.name,
3228            self.rxPacket.data['sn'],
3229            self.remote.name,
3230            self.tid,
3231            self.stack.store.stamp))
3232
3233    def reject(self):
3234        '''
3235        Process nack packet
3236        terminate in response to nack
3237        '''
3238        if not self.stack.parseInner(self.rxPacket):
3239            return
3240
3241        self.remote.refresh(alived=True)
3242        self.stack.incStat("message_reject_nack")
3243
3244        self.remove()
3245        console.concise("Messengent {0}. Rejected by {1} in {2} at {3}\n".format(
3246                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
3247        self.stack.incStat(self.statKey())
3248
3249    def nack(self):
3250        '''
3251        Send nack to terminate messenger transaction
3252        '''
3253        body = odict()
3254        packet = packeting.TxPacket(stack=self.stack,
3255                                    kind=PcktKind.nack.value,
3256                                    embody=body,
3257                                    data=self.txData)
3258        try:
3259            packet.pack()
3260        except raeting.PacketError as ex:
3261            console.terse(str(ex) + '\n')
3262            self.stack.incStat("packing_error")
3263            self.remove()
3264            return
3265
3266        self.transmit(packet)
3267        self.remove()
3268        console.concise("Messagent {0}. Do Nack Reject of {1} in {2} at {3}\n".format(
3269                self.stack.name, self.remote.name, self.tid, self.stack.store.stamp))
3270        self.stack.incStat(self.statKey())
3271
3272    def remove(self, remote=None, index=None):
3273        self.remote.addDoneTransaction(self.tid)
3274        super(Messengent, self).remove(remote, index)
3275