1# -*- coding: utf-8 -*-
2'''
3stacking.py raet protocol stacking classes
4'''
5# pylint: skip-file
6# pylint: disable=W0611
7
8# Import python libs
9import socket
10import os
11import errno
12
13from collections import deque,  Mapping
14try:
15    import simplejson as json
16except ImportError:
17    import json
18
19try:
20    import msgpack
21except ImportError:
22    mspack = None
23
24# Import ioflo libs
25from ioflo.aid.odicting import odict
26from ioflo.base import nonblocking
27
28# Import raet libs
29from ..abiding import *  # import globals
30from .. import raeting
31from ..raeting import PcktKind, TrnsKind, CoatKind, FootKind, BodyKind, HeadKind
32from .. import nacling
33from .. import stacking
34from . import keeping
35from . import packeting
36from . import estating
37from . import transacting
38
39from ioflo.base.consoling import getConsole
40console = getConsole()
41
42class RoadStack(stacking.KeepStack):
43    '''
44    RAET protocol RoadStack for UDP communications. This is the primary
45    network communication system in RAET. A stack does not work in the
46    same way as a socket, instead transmit (tx) and recive (rx) lists become
47    populated or emptied when calls are made to the transmit and recieve
48    methods.
49
50    name
51        The name to give the stack and local estate, if no name is given it will be
52        automatically assigned
53    main
54        Flag indicating if the local estate is a main estate on the road
55    mutable
56        Flag indicating if credentials on road can be changed after initial join
57    veritive
58        Flag indicating if all received message transactions must be verified
59    keep
60        Pass in a keep object, this object can define how stack data
61        including keys is persisted to disk
62    dirpath
63        The location on the filesystem to use for stack caching
64    uid
65        The local estate id, if None is specified a default will be assigned
66    ha
67        The local estate host address, this is a tuple of (network_addr, port) that will
68        be bound to by the stack for accepting incoming packets
69    bufcnt
70        The number of messages to buffer, defaults to 2
71    auto
72        auto acceptance mode indicating how keys should be accepted
73        one of never, once, always
74    period
75        The default iteration timeframe to use for the background management
76        of the presence system. Defaults to 1.0
77    offset
78        The default offset to the start of period
79    interim
80        The default timeout to reap a dead remote
81    role
82        The local estate role identifier for key management
83    '''
84    Count = 0 # count of Stack instances to give unique stack names
85    Hk = HeadKind.raet.value # stack default
86    Bk = BodyKind.json.value # stack default
87    Fk = FootKind.nacl.value # stack default
88    Ck = CoatKind.nacl.value # stack default
89    Bf = False # stack default for bcstflag
90    BurstSize = 0  # stack default for max segments in each burst, 0 = no limit
91    Period = 1.0 # stack default for keep alive
92    Offset = 0.5 # stack default for keep alive
93    Interim = 3600 # stack default for reap timeout
94    JoinerTimeout = 5.0 # stack default for joiner transaction timeout
95    JoinentTimeout = 5.0 # stack default for joinent transaction timeout
96    MsgStaleTimeout = 600.0  # stale messages waiting timeout
97
98    def __init__(self,
99                 puid=None,
100                 keep=None,
101                 dirpath='',
102                 basedirpath='',
103                 auto=None,
104                 local=None, #passed up from subclass
105                 name='',
106                 uid=None, #local estate uid, none means generate it
107                 ha=None,
108                 eha=None,
109                 iha=None,
110                 role=None,
111                 sigkey=None,
112                 prikey=None,
113                 bufcnt=2,
114                 kind=None,
115                 mutable=None,
116                 period=None,
117                 offset=None,
118                 interim=None,
119                 veritive=True,
120                 **kwa
121                 ):
122        '''
123        Setup instance
124
125        '''
126        if getattr(self, 'puid', None) is None:
127            self.puid = puid if puid is not None else self.Uid
128
129        keep = keep or keeping.RoadKeep(dirpath=dirpath,
130                                        basedirpath=basedirpath,
131                                        stackname=name,
132                                        auto=auto)
133
134        ha = ha if ha is not None else ("", raeting.RAET_PORT)
135
136        local = local or estating.LocalEstate(stack=self,
137                                              name=name,
138                                     uid=uid,
139                                     ha=eha or ha,
140                                     iha=iha,
141                                     role=role,
142                                     sigkey=sigkey,
143                                     prikey=prikey, )
144        local.stack = self
145
146        self.aha = ha # init before server is initialized
147
148        # Remotes reference these in their init so create before super
149        self.period = period if period is not None else self.Period
150        self.offset = offset if offset is not None else self.Offset
151        self.interim = interim if interim is not None else self.Interim
152
153        super(RoadStack, self).__init__(puid=puid,
154                                        keep=keep,
155                                        dirpath=dirpath,
156                                        basedirpath=basedirpath,
157                                        local=local,
158                                        bufcnt=bufcnt,
159                                        **kwa)
160        self.kind = kind # application kind associated with the local estate
161        self.mutable = mutable # road data mutability
162        self.veritive = True if veritive else False  # rx message trans must be verified
163        self.joinees = odict() # remotes for vacuous joins, keyed by ha
164        self.alloweds = odict() # allowed remotes keyed by name
165        self.aliveds =  odict() # alived remotes keyed by name
166        self.reapeds =  odict() # reaped remotes keyed by name
167        self.availables = set() # set of available remote names
168
169    @property
170    def ha(self):
171        '''
172        property that returns host address for accepting packets (listening ha)
173        '''
174        return self.aha
175
176    @ha.setter
177    def ha(self, value):
178        self.aha = value
179
180    @property
181    def transactions(self):
182        '''
183        property that returns list of transactions in all remotes
184        '''
185        transactions = []
186        for remote in self.remotes.values():
187            transactions.extend(remote.transactions.values())
188        return transactions
189
190    def serverFromLocal(self):
191        '''
192        Create local listening server for stack
193        '''
194        server = nonblocking.SocketUdpNb(ha=self.ha,
195                                         bufsize=raeting.UDP_MAX_PACKET_SIZE * self.bufcnt)
196        return server
197
198    def addRemote(self, remote, dump=False):
199        '''
200        Add a remote  to .remotes
201        '''
202        super(RoadStack, self).addRemote(remote=remote, dump=dump)
203        if remote.timer.store is not self.store:
204            raise raeting.StackError("Store reference mismatch between remote"
205                                     " '{0}' and stack '{1}'".format(remote.name, stack.name))
206        return remote
207
208    def removeRemote(self, remote, clear=True):
209        '''
210        Remove remote at key uid.
211        If clear then also remove from disk
212        '''
213        super(RoadStack, self).removeRemote(remote=remote, clear=clear)
214        for transaction in remote.transactions.values():
215            transaction.nack()
216
217    def fetchRemoteByKeys(self, sighex, prihex):
218        '''
219        Search for remote with matching (name, sighex, prihex)
220        Return remote if found Otherwise return None
221        '''
222        for remote in self.remotes.values():
223            if (remote.signer.keyhex == sighex or
224                remote.priver.keyhex == prihex):
225                return remote
226
227        return None
228
229    def retrieveRemote(self, uid=None):
230        '''
231        Used when initiating a transaction
232
233        If uid is not None Then returns remote at duid if exists or None
234        If uid is None Then uses first remote unless no remotes then None
235        '''
236        if uid is not None:
237            remote = self.remotes.get(uid, None)
238        else:
239            if self.remotes:
240                remote = self.remotes.values()[0] # zeroth is default
241            else:
242                remote = None
243        return remote
244
245    def createRemote(self, ha):
246        '''
247        Use for vacuous join to create new remote
248        '''
249        if not ha:
250            console.terse("Invalid host address = {0} when creating remote.".format(ha))
251            self.incStat("failed_createremote")
252            return None
253
254        remote = self.newRemote(stack=self,
255                                fuid=0, # vacuous join
256                                sid=0, # always 0 for join
257                                ha=ha) #if ha is not None else dha
258
259        try:
260            self.addRemote(remote) #provisionally add .accepted is None
261        except raeting.StackError as ex:
262            console.terse(str(ex) + '\n')
263            self.incStat("failed_addremote")
264            return None
265
266        return remote
267
268    def newRemote(self, **kwa):
269        '''
270        Used as a wrapper to create new remotes
271        Override to add additional kwa validations
272        '''
273        return estating.RemoteEstate(**kwa)
274
275    def dumpLocalRole(self):
276        '''
277        Dump role keep of local
278        '''
279        self.keep.dumpLocalRole(self.local)
280
281    def restoreLocal(self):
282        '''
283        Load local estate if keeps found and verified and return
284        otherwise return None
285        '''
286        local = None
287        keepData = self.keep.loadLocalData()
288        if keepData:
289            if self.keep.verifyLocalData(keepData):
290                ha = keepData['ha']
291                iha = keepData['iha']
292                aha = keepData['aha']
293                local = estating.LocalEstate(stack=self,
294                                             uid=keepData['uid'],
295                                              name=keepData['name'],
296                                              ha=tuple(ha) if ha else ha,
297                                              iha=tuple(iha) if iha else iha,
298                                              natted=keepData['natted'],
299                                              fqdn=keepData['fqdn'],
300                                              dyned=keepData['dyned'],
301                                              sid=keepData['sid'],
302                                              role=keepData['role'],
303                                              sigkey=keepData['sighex'],
304                                              prikey=keepData['prihex'],)
305                self.puid = keepData['puid']
306                self.aha = tuple(aha) if aha else aha
307                self.local = local
308            else:
309                self.keep.clearLocalData()
310        return local
311
312    def clearLocalRoleKeep(self):
313        '''
314        Clear local keep
315        '''
316        self.keep.clearLocalRoleData()
317
318    def dumpRemoteRole(self, remote):
319        '''
320        Dump keeps of remote
321        '''
322        self.keep.dumpRemoteRole(remote)
323
324    def restoreRemote(self, name):
325        '''
326        Load, add, and return remote with name if any
327        Otherwise return None
328        '''
329        remote = None
330        keepData = self.keep.loadRemoteData(name)
331        if keepData:
332            if self.keep.verifyRemoteData(keepData):
333                ha = keepData['ha']
334                iha = keepData['iha']
335                remote = self.newRemote(ha=tuple(ha) if ha else ha,
336                                        stack=self,
337                                        uid=keepData['uid'],
338                                        fuid=keepData['fuid'],
339                                        name=keepData['name'],
340                                        iha=tuple(iha) if iha else iha,
341                                        natted=keepData['natted'],
342                                        fqdn=keepData['fqdn'],
343                                        dyned=keepData['dyned'],
344                                        sid=keepData['sid'],
345                                        main=keepData['main'],
346                                        kind=keepData['kind'],
347                                        joined=keepData['joined'],
348                                        acceptance=keepData['acceptance'],
349                                        verkey=keepData['verhex'],
350                                        pubkey=keepData['pubhex'],
351                                        role=keepData['role'])
352                if remote:
353                    self.addRemote(remote)
354            else:
355                self.keep.clearRemoteData(name)
356        return remote
357
358    def restoreRemotes(self):
359        '''
360        Load .remotes from valid keep  data if any
361        '''
362        keeps = self.keep.loadAllRemoteData()
363        if keeps:
364            for name, keepData in keeps.items():
365                if self.keep.verifyRemoteData(keepData):
366                    ha = keepData['ha']
367                    iha = keepData['iha']
368                    remote = self.newRemote(ha=tuple(ha) if ha else ha,
369                                            stack=self,
370                                            uid=keepData['uid'],
371                                            fuid=keepData['fuid'],
372                                            name=keepData['name'],
373                                            iha=tuple(iha) if iha else iha,
374                                            natted=keepData['natted'],
375                                            fqdn=keepData['fqdn'],
376                                            dyned=keepData['dyned'],
377                                            sid=keepData['sid'],
378                                            main=keepData['main'],
379                                            kind=keepData['kind'],
380                                            joined=keepData['joined'],
381                                            acceptance=keepData['acceptance'],
382                                            verkey=keepData['verhex'],
383                                            pubkey=keepData['pubhex'],
384                                            role=keepData['role'])
385                    if remote:
386                        self.addRemote(remote)
387                else:
388                    self.keep.clearRemoteData(name)
389
390    def clearRemoteRoleKeeps(self):
391        '''
392        Clear all remote keeps
393        '''
394        self.keep.clearAllRemoteRoleData()
395
396    def clearAllKeeps(self):
397        super(RoadStack, self).clearAllKeeps()
398        self.clearLocalRoleKeep()
399        self.clearRemoteRoleKeeps()
400
401    def manage(self, cascade=False, immediate=False):
402        '''
403        Manage remote estates. Time based processing of remote status such as
404        presence (keep alive) etc.
405
406        cascade induces the alive transactions to run join, allow, alive until
407        failure or alive success
408
409        immediate indicates to run first attempt immediately and not wait for timer
410
411        availables = dict of remotes that are both alive and allowed
412        '''
413        alloweds = odict()
414        aliveds = odict()
415        reapeds = odict()
416        for remote in self.remotes.values(): # should not start anything
417            remote.manage(cascade=cascade, immediate=immediate)
418            if remote.allowed:
419                alloweds[remote.name] = remote
420            if remote.alived:
421                aliveds[remote.name] = remote
422            if remote.reaped:
423                reapeds[remote.name] = remote
424
425        old = set(self.aliveds.keys())
426        current = set(aliveds.keys())
427        plus = current.difference(old)
428        minus = old.difference(current)
429        self.availables = current
430        self.changeds = odict(plus=plus, minus=minus)
431        self.alloweds = alloweds
432        self.aliveds = aliveds
433        self.reapeds = reapeds
434
435    def _handleOneRx(self):
436        '''
437        Handle on message from .rxes deque
438        Assumes that there is a message on the .rxes deque
439        '''
440        raw, sa = self.rxes.popleft()
441        console.verbose("{0} received packet\n{1}\n".format(self.name, raw))
442
443        packet = packeting.RxPacket(stack=self, packed=raw)
444        try:
445            packet.parseOuter()
446        except raeting.PacketError as ex:
447            console.terse(str(ex) + '\n')
448            self.incStat('parsing_outer_error')
449            return
450
451        sh, sp = sa
452        packet.data.update(sh=sh, sp=sp)
453        self.processRx(packet)
454
455    def processRx(self, packet):
456        '''
457        Process packet via associated transaction or
458        reply with new correspondent transaction
459        '''
460        console.profuse("{0} received packet data\n{1}\n".format(self.name, packet.data))
461        console.verbose("{0} received packet index: (rf={1[0]}, le={1[1]}, re={1[2]},"
462                        " si={1[3]}, ti={1[4]}, bf={1[5]})\n".format(self.name, packet.index))
463        try:
464            tkname = TrnsKind(packet.data['tk'])
465        except ValueError as ex:
466            tkname = None
467        try:
468            pkname = PcktKind(packet.data['pk'])
469        except ValueError as ex:
470            pkname = None
471        console.verbose("{0} received trans kind = '{1}' packet kind = '{2}'"
472                        "\n".format(self.name, tkname, pkname))
473
474        bf = packet.data['bf']
475        if bf:
476            return  # broadcast transaction not yet supported
477
478        de = packet.data['de']  # remote nuid
479        se = packet.data['se']  # remote fuid
480        tk = packet.data['tk']
481        pk = packet.data['pk']
482        cf = packet.data['cf']
483        rsid = packet.data['si']
484
485        remote = None
486
487        if tk in [TrnsKind.join]: # join transaction
488            sha = (packet.data['sh'],  packet.data['sp'])
489            if rsid != 0: # join  must use sid == 0
490                emsg = ("Stack '{0}'. Nonzero join sid '{1}' in packet from {2}."
491                        " Dropping...\n".format(self.name, rsid, sha))
492                console.terse(emsg)
493                self.incStat('join_invalid_sid')
494                return
495
496            if cf: # cf = not rf, packet source is joinent, destination (self) is joiner
497                if de == 0: # invalid since joiner rxed packet de (nuid) == 0
498                    emsg = ("Stack '{0}'. Invalid join correspondence from '{1}',"
499                            " nuid zero . Dropping...\n".format(self.name, sha))
500                    console.terse(emsg)
501                    self.incStat('join_invalid_nuid')
502                    return
503                if se == 0: # vacuous join since se (fuid) == 0
504                    remote = self.joinees.get(sha, None)  # match remote by rha from .joinees
505                    if remote and remote.nuid != de: # prior different
506                        emsg = ("Stack '{0}'. Invalid join correspondence from '{1}',"
507                                "nuid  {2} mismatch prior {3} . Dropping...\n".format(
508                                self.name, sha, de, remote.nuid))
509                        self.incStat('join_mismatch_nuid') # drop inconsistent nuid
510                        return
511                else: # non vacuous join match by nuid from .remotes
512                    remote = self.remotes.get(de, None)
513
514            else: # (rf = not cf) # source is joiner, destination (self) is joinent
515                if se == 0: # invalid join
516                    emsg = ("Stack '{0}'. Invalid join initiatance from '{1}',"
517                            "fuid zero. Dropping...\n".format(self.name, sha))
518                    console.terse(emsg)
519                    self.incStat('join_invalid_fuid')
520                    return
521                if de == 0: # vacuous join match remote by rha from joinees
522                    remote = self.joinees.get(sha, None)
523                    if remote and remote.fuid != se: # check if prior is stale
524                        if remote.fuid != 0:  # stale
525                            emsg = ("Stack '{0}'. Prior stale join initiatance from '{1}',"
526                                    " fuid {2} mismatch prior {3}. Removing prior...\n".format(
527                                    self.name, sha, se, remote.fuid))
528                            console.terse(emsg)
529                            del self.joinees[sha] # remove prior stale vacuous joinee
530                            remote = None # reset
531
532                    if not remote: # no current joinees for remote initiator at rha
533                        # is it not first packet of join
534                        if pk not in [PcktKind.request]:
535                            emsg = ("Stack '{0}'. Stale join initiatance from '{1}',"
536                                    " Not a request and no remote. Dropping...\n".format(
537                                    self.name, sha))
538                            console.terse(emsg)
539                            self.incStat('join_stale')
540                            return
541
542                        # create vacuous remote will be assigned to joinees in joinent
543                        remote = self.newRemote(stack=self,
544                                                fuid=0,  # was fuid=se
545                                                sid=rsid,
546                                                ha=sha)
547
548                else: # nonvacuous join match by nuid from .remotes
549                    remote = self.remotes.get(de, None)
550                    if not remote: # remote with nuid not exist
551                        # reject renew , so tell it to retry with vacuous join
552                        emsg = ("Stack '{0}'. Stale nuid '{1}' in packet from {2}."
553                                " Renewing....\n".format( self.name, de, sha))
554                        console.terse(emsg)
555                        self.incStat('stale_nuid')
556                        remote = self.newRemote(stack=self,
557                                                fuid=se,
558                                                sid=rsid,
559                                                ha=sha)
560                        if not remote:
561                            return
562
563                        self.replyStale(packet, remote, renew=True) # nack stale transaction
564                        return
565
566        else: # not join transaction
567            if rsid == 0: # cannot use sid == 0 on nonjoin transaction
568                emsg = ("Stack '{0}'. Invalid Zero sid '{1}' for transaction {2} packet"
569                        " {3}. Dropping...\n".format(self.name, rsid, tk, pk))
570                console.terse(emsg)
571                self.incStat('invalid_sid')
572                return
573
574            if de == 0 or se == 0:
575                emsg = ("Stack '{0}'. Invalid nonjoin from remote '{1}'."
576                        " Zero nuid {2} or fuid {3}. Dropping...\n".format(
577                            self.name, sha, de, se))
578                console.terse(emsg)
579                self.incStat('invalid_uid')
580                return
581
582            remote = self.remotes.get(de, None)
583
584            if remote:
585                if not cf: # packet from remotely initiated transaction
586                    if not remote.validRsid(rsid): # invalid rsid
587                        emsg = ("Stack '{0}'. Invalid nonjoin from '{1}'. Invalid sid "
588                                " {2} in packet given prior sid {3}. "
589                                "Dropping...\n".format(self.name,
590                                                       remote.name,
591                                                        rsid,
592                                                        remote.rsid))
593                        console.terse(emsg)
594                        self.incStat('stale_sid')
595                        self.replyStale(packet, remote) # nack stale transaction
596                        return
597
598                    if rsid != remote.rsid: # updated valid rsid so change remote.rsid
599                        remote.rsid = rsid
600                        remote.removeStaleCorrespondents()
601
602                if remote.reaped:
603                    remote.unreap() # packet a valid packet so remote is not dead
604
605        if remote:
606            trans = remote.transactions.get(packet.index, None)
607            if trans:
608                trans.receive(packet)
609                return
610
611        if cf:  # cf = not rf packet from correspondent to non-existent locally initiated transaction
612            self.stale(packet)
613            return
614
615        if not remote:
616            emsg = ("Stack '{0}'. Unknown remote destination '{1}'. "
617                    "Dropping...\n".format(self.name, de))
618            console.terse(emsg)
619            self.incStat('unknown_destination_uid')
620            return
621
622        self.correspond(packet, remote) # correspond to new transaction initiated by remote
623
624    def correspond(self, packet, remote):
625        '''
626        Create correspondent transaction remote and handle packet
627        '''
628        if (packet.data['tk'] == TrnsKind.join and
629            packet.data['pk'] == PcktKind.request):
630            self.replyJoin(packet, remote)
631            return
632
633        if (packet.data['tk'] == TrnsKind.allow and
634            packet.data['pk'] == PcktKind.hello):
635            self.replyAllow(packet, remote)
636            return
637
638        if (packet.data['tk'] == TrnsKind.alive and
639            packet.data['pk'] == PcktKind.request):
640            self.replyAlive(packet, remote)
641            return
642
643        if (packet.data['tk'] == TrnsKind.message and
644            packet.data['pk'] == PcktKind.message):
645            # transaction with this ID already handled and removed packet is a stale resend
646            if packet.data['af'] or packet.data['ti'] in remote.doneTransactions:
647                self.replyStale(packet, remote)
648            else:
649                self.replyMessage(packet, remote)
650            return
651
652        self.incStat('stale_packet')
653
654    def process(self):
655        '''
656        Call .process or all remotes to allow timer based processing
657        of their transactions
658        '''
659        #for transaction in self.transactions.values():
660            #transaction.process()
661        for remote in self.remotes.values():
662            remote.process()
663
664    def parseInner(self, packet):
665        '''
666        Parse inner of packet and return
667        Assume all drop checks done
668        '''
669        try:
670            packet.parseInner()
671            console.verbose("Stack '{0}'. Received packet body\n{1}\n".format(
672                self.name, packet.body.data))
673        except raeting.PacketError as ex:
674            console.terse(str(ex) + '\n')
675            self.incStat('parsing_inner_error')
676            return None
677        return packet
678
679    def stale(self, packet):
680        '''
681        Initiate stale transaction in order to nack a stale correspondent packet
682        but only for preexisting remotes
683        '''
684        if packet.data['pk'] in [PcktKind.nack,
685                                 PcktKind.unjoined,
686                                PcktKind.unallowed,
687                                PcktKind.renew,
688                                PcktKind.refuse,
689                                PcktKind.reject,]:
690            return # ignore stale nacks
691        create = False
692        uid = packet.data['de']
693        remote = self.retrieveRemote(uid=uid)
694        if not remote:
695            emsg = "Stack '{0}'. Unknown remote id '{1}', fail to nack stale\n".format(
696                self.name, uid)
697            console.terse(emsg)
698            self.incStat('invalid_remote_eid')
699            return
700        data = odict(hk=self.Hk, bk=self.Bk)
701        staler = transacting.Staler(stack=self,
702                                    remote=remote,
703                                    kind=packet.data['tk'],
704                                    sid=packet.data['si'],
705                                    tid=packet.data['ti'],
706                                    txData=data,
707                                    rxPacket=packet)
708        staler.nack()
709
710    def replyStale(self, packet, remote, renew=False):
711        '''
712        Correspond to stale initiated transaction
713        '''
714        if packet.data['pk'] in [PcktKind.nack,
715                                 PcktKind.unjoined,
716                                 PcktKind.unallowed,
717                                 PcktKind.refuse,
718                                 PcktKind.reject,]:
719            return # ignore stale nacks
720        data = odict(hk=self.Hk, bk=self.Bk)
721        stalent = transacting.Stalent(stack=self,
722                                      remote=remote,
723                                      kind=packet.data['tk'],
724                                      sid=packet.data['si'],
725                                      tid=packet.data['ti'],
726                                      txData=data,
727                                      rxPacket=packet)
728        if renew:
729            stalent.nack(kind=PcktKind.renew.value) # refuse and renew
730        else:
731            stalent.nack()
732
733    def join(self, uid=None, timeout=None, cascade=False, renewal=False):
734        '''
735        Initiate join transaction
736        '''
737        remote = self.retrieveRemote(uid=uid)
738        if not remote:
739            emsg = "Invalid remote destination estate id '{0}'\n".format(uid)
740            console.terse(emsg)
741            self.incStat('invalid_remote_eid')
742            return
743
744        timeout = timeout if timeout is not None else self.JoinerTimeout
745        data = odict(hk=self.Hk, bk=self.Bk)
746        joiner = transacting.Joiner(stack=self,
747                                    remote=remote,
748                                    timeout=timeout,
749                                    txData=data,
750                                    cascade=cascade,
751                                    renewal=renewal)
752        joiner.join()
753
754    def replyJoin(self, packet, remote, timeout=None):
755        '''
756        Correspond to new join transaction
757        '''
758        timeout = timeout if timeout is not None else self.JoinentTimeout
759        data = odict(hk=self.Hk, bk=self.Bk)
760        joinent = transacting.Joinent(stack=self,
761                                      remote=remote,
762                                      timeout=timeout,
763                                      sid=packet.data['si'],
764                                      tid=packet.data['ti'],
765                                      txData=data,
766                                      rxPacket=packet)
767        joinent.join()
768
769    def allow(self, uid=None, timeout=None, cascade=False):
770        '''
771        Initiate allow transaction
772        '''
773        remote = self.retrieveRemote(uid=uid)
774        if not remote:
775            emsg = "Invalid remote destination estate id '{0}'\n".format(uid)
776            console.terse(emsg)
777            self.incStat('invalid_remote_eid')
778            return
779        data = odict(hk=self.Hk, bk=BodyKind.raw.value, fk=self.Fk)
780        allower = transacting.Allower(stack=self,
781                                      remote=remote,
782                                      timeout=timeout,
783                                      txData=data,
784                                      cascade=cascade)
785        allower.hello()
786
787    def replyAllow(self, packet, remote):
788        '''
789        Correspond to new allow transaction
790        '''
791        data = odict(hk=self.Hk, bk=BodyKind.raw.value, fk=self.Fk)
792        allowent = transacting.Allowent(stack=self,
793                                        remote=remote,
794                                        sid=packet.data['si'],
795                                        tid=packet.data['ti'],
796                                        txData=data,
797                                        rxPacket=packet)
798        allowent.hello()
799
800    def alive(self, uid=None, timeout=None, cascade=False):
801        '''
802        Initiate alive transaction
803        If duid is None then create remote at ha
804        '''
805        remote = self.retrieveRemote(uid=uid)
806        if not remote:
807            emsg = "Invalid remote destination estate id '{0}'\n".format(uid)
808            console.terse(emsg)
809            self.incStat('invalid_remote_eid')
810            return
811        data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck)
812        aliver = transacting.Aliver(stack=self,
813                                    remote=remote,
814                                    timeout=timeout,
815                                    txData=data,
816                                    cascade=cascade)
817        aliver.alive()
818
819    def replyAlive(self, packet, remote):
820        '''
821        Correspond to new Alive transaction
822        '''
823        data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck)
824        alivent = transacting.Alivent(stack=self,
825                                      remote=remote,
826                                      bcst=packet.data['bf'],
827                                      sid=packet.data['si'],
828                                      tid=packet.data['ti'],
829                                      txData=data,
830                                      rxPacket=packet)
831        alivent.alive()
832
833    def transmit(self, msg, uid=None, timeout=None):
834        '''
835        Append duple (msg, uid) to .txMsgs deque
836        If msg is not mapping then raises exception
837        If uid is None then it will default to the first entry in .remotes
838        If timeout is None then it will use Messenger default
839        timeout of 0 means never timeout of message transaction
840        '''
841        if not isinstance(msg, Mapping):
842            emsg = "Invalid msg, not a mapping {0}\n".format(msg)
843            console.terse(emsg)
844            self.incStat("invalid_transmit_body")
845            return
846        if uid is None:
847            if not self.remotes:
848                emsg = "No remote to send to\n"
849                console.terse(emsg)
850                self.incStat("invalid_destination")
851                return
852            uid = self.remotes.values()[0].uid
853        self.txMsgs.append((msg, uid, timeout))
854
855    def  _handleOneTxMsg(self):
856        '''
857        Take one message from .txMsgs deque and handle it
858        Assumes there is a message on the deque
859        '''
860        # triple (body dict, destination uid, timout)
861        body, uid, timeout = self.txMsgs.popleft()
862        self.message(body, uid=uid, timeout=timeout)
863        console.verbose("{0} sending\n{1}\n".format(self.name, body))
864
865    def message(self, body, uid=None, timeout=None):
866        '''
867        Initiate message transaction to remote at duid
868        If uid is None then create remote at ha
869        If timeout is None then use Messenger default
870        If timeout is 0 then never timeout
871        '''
872        remote = self.retrieveRemote(uid=uid)
873        if not remote:
874            emsg = "Invalid remote destination estate id '{0}'\n".format(uid)
875            console.terse(emsg)
876            self.incStat('invalid_remote_uid')
877            return
878        data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck)
879        messenger = transacting.Messenger(stack=self,
880                                          remote=remote,
881                                          timeout=timeout,
882                                          txData=data,
883                                          bcst=self.Bf,
884                                          burst=self.BurstSize)
885        messenger.message(body)
886
887    def replyMessage(self, packet, remote):
888        '''
889        Correspond to new Message transaction
890        '''
891        data = odict(hk=self.Hk, bk=self.Bk, fk=self.Fk, ck=self.Ck)
892        messengent = transacting.Messengent(stack=self,
893                                            remote=remote,
894                                            bcst=packet.data['bf'],
895                                            sid=packet.data['si'],
896                                            tid=packet.data['ti'],
897                                            txData=data,
898                                            rxPacket=packet)
899        messengent.message()
900
901