1# -*- coding: utf-8 -*-
2'''
3stacking.py raet protocol stacking classes
4'''
5# pylint: skip-file
6# pylint: disable=W0611
7
8# Import python libs
9import socket
10import os
11import errno
12import sys
13if sys.version_info > (3,):
14    long = int
15
16from collections import deque,  Mapping
17try:
18    import simplejson as json
19except ImportError:
20    import json
21
22try:
23    import msgpack
24except ImportError:
25    mspack = None
26
27# Import ioflo libs
28from ioflo.aid.odicting import odict
29from ioflo.aid.timing import StoreTimer
30from ioflo.base.storing import Store
31
32# Import raet libs
33from .abiding import *  # import globals
34from . import raeting
35from . import keeping
36from . import lotting
37
38from ioflo.base.consoling import getConsole
39console = getConsole()
40
41class Stack(object):
42    '''
43    RAET protocol base stack object.
44    Should be subclassed for specific transport type such as UDP or UXD
45    '''
46    Count = 0
47    Uid = 0 # base for next unique id for local and remotes
48
49    def __init__(self,
50                 store=None,
51                 version=raeting.VERSION,
52                 main=None,
53                 puid=None,
54                 local=None, #passed up from subclass
55                 name='',
56                 uid=None,
57                 server=None,
58                 ha=None,
59                 bufcnt=2,
60                 rxMsgs=None,
61                 txMsgs=None,
62                 rxes=None,
63                 txes=None,
64                 stats=None,
65                ):
66        '''
67        Setup Stack instance
68        '''
69        self.store = store or Store(stamp=0.0)
70
71        self.version = version
72        self.main = main
73
74        if getattr(self, 'puid', None) is None:
75            self.puid = puid if puid is not None else self.Uid
76
77        self.local = local or lotting.Lot(stack=self,
78                                          name=name,
79                                          uid=uid,
80                                          ha=ha,)
81        self.local.stack = self
82
83        self.remotes = self.uidRemotes = odict() # remotes indexed by uid
84        self.nameRemotes = odict() # remotes indexed by name
85
86        self.bufcnt = bufcnt
87        if not server:
88            server = self.serverFromLocal()
89
90        self.server = server
91        if self.server:
92            if not self.server.reopen():  # open socket
93                raise raeting.StackError("Stack '{0}': Failed opening server at"
94                            " '{1}'\n".format(self.name, self.server.ha))
95
96            self.ha = self.server.ha  # update local host address after open
97
98            console.verbose("Stack '{0}': Opened server at '{1}'\n".format(self.name,
99                                                                           self.ha))
100
101        self.rxMsgs = rxMsgs if rxMsgs is not None else deque() # messages received
102        self.txMsgs = txMsgs if txMsgs is not None else deque() # messages to transmit
103        self.rxes = rxes if rxes is not None else deque() # udp packets received
104        self.txes = txes if txes is not None else deque() # udp packet to transmit
105        self.stats = stats if stats is not None else odict() # udp statistics
106        self.statTimer = StoreTimer(self.store)
107
108    @property
109    def name(self):
110        '''
111        property that returns name of local interface
112        '''
113        return self.local.name
114
115    @name.setter
116    def name(self, value):
117        '''
118        setter for name property
119        '''
120        self.local.name = value
121
122    @property
123    def ha(self):
124        '''
125        property that returns host address
126        '''
127        return self.local.ha
128
129    @ha.setter
130    def ha(self, value):
131        self.local.ha = value
132
133    def serverFromLocal(self):
134        '''
135        Create server from local data
136        '''
137        return None
138
139    def nextUid(self):
140        '''
141        Generates next unique id number for local or remotes.
142        '''
143        self.puid += 1
144        if self.puid > long(0xffffffff):
145            self.puid = 1  # rollover to 1
146        return self.puid
147
148    def addRemote(self, remote):
149        '''
150        Add a remote to indexes
151        '''
152        uid = remote.uid
153        # allow for condition where local.uid == 0 and remote.uid == 0
154        if uid in self.uidRemotes or (uid and uid == self.local.uid):
155            emsg = "Cannot add remote at uid '{0}', alreadys exists".format(uid)
156            raise raeting.StackError(emsg)
157        if remote.name in  self.nameRemotes or remote.name == self.local.name:
158            emsg = "Cannot add remote at name '{0}', alreadys exists".format(remote.name)
159            raise raeting.StackError(emsg)
160        remote.stack = self
161        self.uidRemotes[uid] = remote
162        self.nameRemotes[remote.name] = remote
163        return remote
164
165    def moveRemote(self, remote, new):
166        '''
167        Move remote at key remote.uid to new uid and replace the odict key index
168        so order is the same
169        '''
170        old = remote.uid
171
172        if new in self.uidRemotes or new == self.local.uid:
173            emsg = "Cannot move, remote to '{0}', already exists".format(new)
174            raise raeting.StackError(emsg)
175
176        if old not in self.uidRemotes:
177            emsg = "Cannot move remote at '{0}', does not exist".format(old)
178            raise raeting.StackError(emsg)
179
180        if remote is not self.uidRemotes[old]:
181            emsg = "Cannot move remote at '{0}', not identical".format(old)
182            raise raeting.StackError(emsg)
183
184        remote.uid = new
185        index = self.uidRemotes.keys().index(old)
186        del self.uidRemotes[old]
187        self.uidRemotes.insert(index, new, remote)
188
189    def renameRemote(self, remote, new):
190        '''
191        rename remote with old remote.name to new name but keep same index
192        '''
193        old = remote.name
194        if new != old:
195            if new in self.nameRemotes or new == self.local.name:
196                emsg = "Cannot rename remote to '{0}', already exists".format(new)
197                raise raeting.StackError(emsg)
198
199            if old not in self.nameRemotes:
200                emsg = "Cannot rename remote '{0}', does not exist".format(old)
201                raise raeting.StackError(emsg)
202
203            if remote is not self.nameRemotes[old]:
204                emsg = "Cannot rename remote '{0}', not identical".format(old)
205                raise raeting.StackError(emsg)
206
207            remote.name = new
208            index = self.nameRemotes.keys().index(old)
209            del self.nameRemotes[old]
210            self.nameRemotes.insert(index, new, remote)
211
212    def removeRemote(self, remote):
213        '''
214        Remove remote from all remotes dicts
215        '''
216        uid = remote.uid
217        if uid not in self.uidRemotes:
218            emsg = "Cannot remove remote '{0}', does not exist".format(uid)
219            raise raeting.StackError(emsg)
220
221        if remote is not self.uidRemotes[uid]:
222            emsg = "Cannot remove remote '{0}', not identical".format(uid)
223            raise raeting.StackError(emsg)
224
225        del self.uidRemotes[uid]
226        del self.nameRemotes[remote.name]
227
228    def removeAllRemotes(self):
229        '''
230        Remove all the remotes
231        '''
232        remotes = self.remotes.values() #make copy since changing .remotes in-place
233        for remote in remotes:
234            self.removeRemote(remote)
235
236    def fetchUidByName(self, name):
237        '''
238        Search for remote with matching name
239        Return remote if found Otherwise return None
240        '''
241        remote = self.nameRemotes.get(name)
242        return (remote.uid if remote else None)
243
244    def incStat(self, key, delta=1):
245        '''
246        Increment stat key counter by delta
247        '''
248        if key in self.stats:
249            self.stats[key] += delta
250        else:
251            self.stats[key] = delta
252
253    def updateStat(self, key, value):
254        '''
255        Set stat key to value
256        '''
257        self.stats[key] = value
258
259    def clearStat(self, key):
260        '''
261        Set the specified state counter to zero
262        '''
263        if key in self.stats:
264            self.stats[key] = 0
265
266    def clearStats(self):
267        '''
268        Set all the stat counters to zero and reset the timer
269        '''
270        for key, value in self.stats.items():
271            self.stats[key] = 0
272        self.statTimer.restart()
273
274    def _handleOneReceived(self):
275        '''
276        Handle one received message from server
277        assumes that there is a server
278        '''
279        try:
280            rx, ra = self.server.receive()  # if no data the duple is ('',None)
281        except socket.error as ex:
282            err = raeting.get_exception_error(ex)
283            if err == errno.ECONNRESET:
284                return False
285        if not rx:  # no received data
286            return False
287        self.rxes.append((rx, ra))     # duple = ( packet, source address)
288        return True
289
290    def serviceReceives(self):
291        '''
292        Retrieve from server all recieved and put on the rxes deque
293        '''
294        if self.server:
295            while self._handleOneReceived():
296                pass
297
298    def serviceReceiveOnce(self):
299        '''
300        Retrieve from server one recieved and put on the rxes deque
301        '''
302        if self.server:
303            self._handleOneReceived()
304
305    def _handleOneRx(self):
306        '''
307        Handle on message from .rxes deque
308        Assumes that there is a message on the .rxes deque
309        '''
310        raw, sa = self.rxes.popleft()
311        console.verbose("{0} received raw message\n{1}\n".format(self.name, raw))
312        processRx(packet=raw)
313
314    def serviceRxes(self):
315        '''
316        Process all messages in .rxes deque
317        '''
318        while self.rxes:
319            self._handleOneRx()
320
321    def serviceRxOnce(self):
322        '''
323        Process one messages in .rxes deque
324        '''
325        if self.rxes:
326            self._handleOneRx()
327
328    def processRx(self, packet):
329        '''
330        Process
331        '''
332        pass
333
334    def transmit(self, msg, uid=None):
335        '''
336        Append duple (msg, uid) to .txMsgs deque
337        If msg is not mapping then raises exception
338        If uid is None then it will default to the first entry in .remotes
339        '''
340        if not isinstance(msg, Mapping):
341            emsg = "Invalid msg, not a mapping {0}\n".format(msg)
342            console.terse(emsg)
343            self.incStat("invalid_transmit_body")
344            return
345        if uid is None:
346            if not self.remotes:
347                emsg = "No remote to send to\n"
348                console.terse(emsg)
349                self.incStat("invalid_destination")
350                return
351            uid = self.remotes.values()[0].uid
352        self.txMsgs.append((msg, uid))
353
354    def  _handleOneTxMsg(self):
355        '''
356        Take one message from .txMsgs deque and handle it
357        Assumes there is a message on the deque
358        '''
359        body, uid = self.txMsgs.popleft() # duple (body dict, destination uid
360        self.message(body, uid=uid)
361        console.verbose("{0} sending\n{1}\n".format(self.name, body))
362
363    def serviceTxMsgs(self):
364        '''
365        Service .txMsgs queue of outgoing  messages
366        '''
367        while self.txMsgs:
368            self._handleOneTxMsg()
369
370    def serviceTxMsgOnce(self):
371        '''
372        Service one message on .txMsgs queue of outgoing messages
373        '''
374        if self.txMsgs:
375            self._handleOneTxMsg()
376
377    def message(self, body, uid=None):
378        '''
379        Sends message body to remote at uid
380        '''
381        pass
382
383    def tx(self, packed, duid):
384        '''
385        Queue duple of (packed, da) on stack .txes queue
386        Where da is the ip destination (host,port) address associated with
387        the remote identified by duid
388        '''
389        if duid not in self.remotes:
390            msg = "Invalid destination remote id '{0}'".format(duid)
391            raise raeting.StackError(msg)
392        self.txes.append((packed, self.remotes[duid].ha))
393
394    def _handleOneTx(self, laters, blocks):
395        '''
396        Handle one message on .txes deque
397        Assumes there is a message
398        laters is deque of messages to try again later
399        blocks is list of destinations that already blocked on this service
400        '''
401        tx, ta = self.txes.popleft()  # duple = (packet, destination address)
402
403        if ta in blocks: # already blocked on this iteration
404            laters.append((tx, ta)) # keep sequential
405            return
406
407        try:
408            self.server.send(tx, ta)
409        except socket.error as ex:
410            err = raeting.get_exception_error(ex)
411            errors = [errno.EAGAIN,
412                      errno.EWOULDBLOCK,
413                      errno.ENETUNREACH,
414                      errno.EHOSTUNREACH,
415                      errno.EHOSTDOWN,
416                      errno.ECONNRESET]
417            if hasattr(errno, 'ETIME'):
418                errors.append(errno.ETIME)
419            if (err in errors):
420                # problem sending such as busy with last message. save it for later
421                laters.append((tx, ta))
422                blocks.append(ta)
423            else:
424                raise
425
426    def serviceTxes(self):
427        '''
428        Service the .txes deque to send  messages through server
429        '''
430        if self.server:
431            laters = deque()
432            blocks = []
433            while self.txes:
434                self._handleOneTx(laters, blocks)
435            while laters:
436                self.txes.append(laters.popleft())
437
438    def serviceTxOnce(self):
439        '''
440        Service on message on the .txes deque to send through server
441        '''
442        if self.server:
443            laters = deque()
444            blocks = [] # will always be empty since only once
445            if self.txes:
446                self._handleOneTx(laters, blocks)
447            while laters:
448                self.txes.append(laters.popleft())
449
450    def serviceAllRx(self):
451        '''
452        Service:
453           server receive
454           rxes queue
455           process
456        '''
457        self.serviceReceives()
458        self.serviceRxes()
459        self.process()
460
461    def serviceAllTx(self):
462        '''
463        Service:
464           txMsgs queue
465           txes queue to server send
466        '''
467        self.serviceTxMsgs()
468        self.serviceTxes()
469
470    def serviceAll(self):
471        '''
472        Service or Process:
473           server receive
474           rxes queue
475           process
476           txMsgs queue
477           txes queue to server send
478        '''
479        self.serviceAllRx()
480        self.serviceAllTx()
481
482    def serviceServer(self):
483        '''
484        Service the server's receive and transmit queues
485        '''
486        self.serviceReceives()
487        self.serviceTxes()
488
489    def serviceOneAllRx(self):
490        '''
491        Propagate one packet all the way through the received side of the stack
492        Service:
493           server receive
494           rxes queue
495           process
496        '''
497        self.serviceReceiveOnce()
498        self.serviceRxOnce()
499        self.process()
500
501    def serviceOneAllTx(self):
502        '''
503        Propagate one packet all the way through the transmit side of the stack
504        Service:
505           txMsgs queue
506           txes queue to server send
507        '''
508        self.serviceTxMsgOnce()
509        self.serviceTxOnce()
510
511    def process(self):
512        '''
513        Allow timer based processing
514        '''
515        pass
516
517class KeepStack(Stack):
518    '''
519    RAET protocol base stack object with persistance via Keep attribute.
520    Should be subclassed for specific transport type
521    '''
522    Count = 0
523    Uid =  0
524
525    def __init__(self,
526                 puid=None,
527                 clean=False,
528                 cleanlocal=False,
529                 cleanremote=False,
530                 keep=None,
531                 dirpath='',
532                 basedirpath='',
533                 local=None, #passed up from subclass
534                 name='',
535                 uid=None,
536                 ha=None,
537                 **kwa
538                 ):
539        '''
540        Setup Stack instance
541        '''
542        if getattr(self, 'puid', None) is None:
543            self.puid = puid if puid is not None else self.Uid
544
545        self.keep = keep or keeping.LotKeep(dirpath=dirpath,
546                                            basedirpath=basedirpath,
547                                            stackname=name)
548
549        if clean or cleanlocal: # clear persisted data so use provided or default data
550            self.clearLocalKeep()
551
552        local = self.restoreLocal() or local or lotting.Lot(stack=self,
553                                                                 name=name,
554                                                                 uid=uid,
555                                                                 ha=ha,
556                                                                 )
557        local.stack = self
558
559        super(KeepStack, self).__init__(puid=puid,
560                                        local=local,
561                                        **kwa)
562
563        if clean or cleanremote:
564            self.clearRemoteKeeps()
565        self.restoreRemotes() # load remotes from saved data
566
567        for remote in self.remotes.values():
568            remote.nextSid()
569
570        self.dumpLocal() # save local data
571        self.dumpRemotes() # save remote data
572
573    def addRemote(self, remote, dump=False):
574        '''
575        Add a remote  to .remotes
576        '''
577        super(KeepStack, self).addRemote(remote=remote)
578        if dump:
579            self.dumpRemote(remote)
580        return remote
581
582    def moveRemote(self, remote, new, clear=False, dump=False):
583        '''
584        Move remote with key remote.uid old to key new uid and replace
585           the odict key index so the order is the same.
586        If clear then clear the keep file for remote at old
587        If dump then dump the keep file for the remote at new
588        '''
589        #old = remote.uid
590        super(KeepStack, self).moveRemote(remote, new=new)
591        if clear:
592            self.keep.clearRemoteData(remote.name)
593        if dump:
594            self.dumpRemote(remote=remote)
595
596    def renameRemote(self, remote, new, clear=True, dump=False):
597        '''
598        Rename remote with old remote.name to new name but keep same index
599        '''
600        old = remote.name
601        super(KeepStack, self).renameRemote(remote=remote, new=new)
602        if clear:
603            self.keep.clearRemoteData(old)
604        if dump:
605            self.dumpRemote(remote=remote)
606
607    def removeRemote(self, remote, clear=True):
608        '''
609        Remove remote
610        If clear then also remove from disk
611        '''
612        super(KeepStack, self).removeRemote(remote=remote)
613        if clear:
614            self.clearRemote(remote)
615
616    def removeAllRemotes(self, clear=True):
617        '''
618        Remove all the remotes
619        If clear then also remove from disk
620        '''
621        remotes = self.remotes.values() #make copy since changing .remotes in-place
622        for remote in remotes:
623            self.removeRemote(remote, clear=clear)
624
625    def clearAllDir(self):
626        '''
627        Clear out and remove the keep dir and contents
628        '''
629        console.verbose("Stack {0}: Clearing keep dir '{1}'\n".format(
630                                  self.name, self.keep.dirpath))
631        self.keep.clearAllDir()
632
633    def dumpLocal(self):
634        '''
635        Dump keeps of local
636        '''
637        self.keep.dumpLocal(self.local)
638
639    def restoreLocal(self):
640        '''
641        Load self.local from keep file if any and return local
642        Otherwise return None
643        '''
644        local = None
645        data = self.keep.loadLocalData()
646        if data:
647            if self.keep.verifyLocalData(data):
648                local = lotting.Lot(stack=self,
649                                         uid=data['uid'],
650                                         name=data['name'],
651                                         ha=data['ha'],
652                                         sid = data['sid'])
653                self.local = local
654            else:
655                self.keep.clearLocalData()
656        return local
657
658    def clearLocalKeep(self):
659        '''
660        Clear local keep
661        '''
662        self.keep.clearLocalData()
663
664    def dumpRemote(self, remote):
665        '''
666        Dump keeps of remote
667        '''
668        self.keep.dumpRemote(remote)
669
670    def dumpRemotes(self, clear=True):
671        '''
672        Dump all remotes data to keep files
673        If clear then clear all files first
674        '''
675        if clear:
676            self.clearRemotes()
677        for remote in self.remotes.values():
678            self.dumpRemote(remote)
679
680    def restoreRemote(self, name):
681        '''
682        Load, add, and return remote with name if any
683        Otherwise return None
684        '''
685        remote = None
686        data = self.keep.loadRemoteData(name)
687        if data:
688            if self.keep.verifyRemoteData(data):
689                remote = lotting.Lot(stack=self,
690                                  uid=data['uid'],
691                                  name=data['name'],
692                                  ha=data['ha'],
693                                  sid=data['sid'])
694                self.addRemote(remote)
695            else:
696                self.keep.clearRemoteData(name)
697        return remote
698
699    def restoreRemotes(self):
700        '''
701        Load and add remote for each remote file
702        '''
703        keeps = self.keep.loadAllRemoteData()
704        if keeps:
705            for name, data in keeps.items():
706                if self.keep.verifyRemoteData(data):
707                    remote = lotting.Lot(stack=self,
708                                      uid=data['uid'],
709                                      name=data['name'],
710                                      ha=data['ha'],
711                                      sid=data['sid'])
712                    self.addRemote(remote)
713                else:
714                    self.keep.clearRemoteData(name)
715
716    def clearRemote(self, remote):
717        '''
718        Clear remote keep of remote
719        '''
720        self.keep.clearRemoteData(remote.name)
721
722    def clearRemotes(self):
723        '''
724        Clear remote keeps of .remotes
725        '''
726        for remote in self.remotes.values():
727            self.clearRemote(remote)
728
729    def clearRemoteKeeps(self):
730        '''
731        Clear all remote keeps
732        '''
733        self.keep.clearAllRemoteData()
734
735    def clearAllKeeps(self):
736        self.clearLocalKeep()
737        self.clearRemoteKeeps()
738
739