1# -*- test-case-name: epsilon.test.test_juice -*-
2# Copyright 2005 Divmod, Inc.  See LICENSE file for details
3
4import warnings, pprint
5import keyword
6import io
7
8import six
9from twisted.internet.main import CONNECTION_LOST
10from twisted.internet.defer import Deferred, maybeDeferred, fail
11from twisted.internet.protocol import ServerFactory, ClientFactory
12from twisted.internet.ssl import Certificate
13from twisted.python.failure import Failure
14from twisted.python import log, filepath
15
16from epsilon.liner import LineReceiver
17from epsilon.compat import long
18from epsilon import extime
19
20ASK = '_ask'
21ANSWER = '_answer'
22COMMAND = '_command'
23ERROR = '_error'
24ERROR_CODE = '_error_code'
25ERROR_DESCRIPTION = '_error_description'
26LENGTH = '_length'
27BODY = 'body'
28
29debug = False
30
31class JuiceBox(dict):
32    """ I am a packet in the JUICE protocol.  """
33
34    def __init__(self, __body='', **kw):
35        self.update(kw)
36        if __body:
37            assert isinstance(__body, str), "body must be a string: %r" % ( repr(__body),)
38            self['body'] = __body
39
40    def body():
41        def get(self):
42            warnings.warn("body attribute of boxes is now just a regular field",
43                          stacklevel=2)
44            return self['body']
45        def set(self, newbody):
46            warnings.warn("body attribute of boxes is now just a regular field",
47                          stacklevel=2)
48            self['body'] = newbody
49        return get,set
50    body = property(*body())
51
52    def copy(self):
53        newBox = self.__class__()
54        newBox.update(self)
55        return newBox
56
57    def serialize(self,
58                  delimiter=b'\r\n',
59                  escaped=b'\r\n '):
60        assert LENGTH not in self
61        delimiter = six.ensure_binary(delimiter)
62        escaped = six.ensure_binary(escaped)
63
64        L = []
65        for (k, v) in six.viewitems(self):
66            if k == BODY:
67                k = LENGTH
68                v = str(len(self[BODY]))
69            L.append(six.ensure_binary(k).replace(b'_', b'-').title())
70            L.append(b': ')
71            L.append(six.ensure_binary(v).replace(delimiter, escaped))
72            L.append(delimiter)
73
74        L.append(delimiter)
75        if BODY in self:
76            L.append(six.ensure_binary(self[BODY]))
77        return b''.join(L)
78
79    def sendTo(self, proto):
80        """
81        Serialize and send this box to a Juice instance.  By the time it is
82        being sent, several keys are required.  I must have exactly ONE of::
83
84            -ask
85            -answer
86            -error
87
88        If the '-ask' header is set, then the '-command' header must also be
89        set.
90        """
91        proto.sendPacket(self)
92
93# juice.Box => JuiceBox
94
95Box = JuiceBox
96
97class TLSBox(JuiceBox):
98    def __repr__(self):
99        return 'TLS(**%s)' % (super(TLSBox, self).__repr__(),)
100
101
102    def __init__(self, __certificate, __verify=None, __sslstarted=None, **kw):
103        super(TLSBox, self).__init__(**kw)
104        self.certificate = __certificate
105        self.verify = __verify
106        self.sslstarted = __sslstarted
107
108    def sendTo(self, proto):
109        super(TLSBox, self).sendTo(proto)
110        if self.verify is None:
111            proto.startTLS(self.certificate)
112        else:
113            proto.startTLS(self.certificate, self.verify)
114        if self.sslstarted is not None:
115            self.sslstarted()
116
117class QuitBox(JuiceBox):
118    def __repr__(self):
119        return 'Quit(**%s)' % (super(QuitBox, self).__repr__(),)
120
121
122    def sendTo(self, proto):
123        super(QuitBox, self).sendTo(proto)
124        proto.transport.loseConnection()
125
126class _SwitchBox(JuiceBox):
127    def __repr__(self):
128        return 'Switch(**%s)' % (super(_SwitchBox, self).__repr__(),)
129
130
131    def __init__(self, __proto, **kw):
132        super(_SwitchBox, self).__init__(**kw)
133        self.innerProto = __proto
134
135    def sendTo(self, proto):
136        super(_SwitchBox, self).sendTo(proto)
137        proto._switchTo(self.innerProto)
138
139
140
141class NegotiateBox(JuiceBox):
142    def __repr__(self):
143        return 'Negotiate(**%s)' % (super(NegotiateBox, self).__repr__(),)
144
145
146    def sendTo(self, proto):
147        super(NegotiateBox, self).sendTo(proto)
148        proto._setProtocolVersion(int(self['version']))
149
150
151
152class JuiceError(Exception):
153    pass
154
155class RemoteJuiceError(JuiceError):
156    """
157    This error indicates that something went wrong on the remote end of the
158    connection, and the error was serialized and transmitted to you.
159    """
160    def __init__(self, errorCode, description, fatal=False):
161        """Create a remote error with an error code and description.
162        """
163        Exception.__init__(self, "Remote[%s]: %s" % (errorCode, description))
164        self.errorCode = errorCode
165        self.description = description
166        self.fatal = fatal
167
168class UnhandledRemoteJuiceError(RemoteJuiceError):
169    def __init__(self, description):
170        errorCode = b"UNHANDLED"
171        RemoteJuiceError.__init__(self, errorCode, description)
172
173class JuiceBoxError(JuiceError):
174    pass
175
176class MalformedJuiceBox(JuiceBoxError):
177    pass
178
179class UnhandledCommand(JuiceError):
180    pass
181
182
183class IncompatibleVersions(JuiceError):
184    pass
185
186class _Transactor:
187    def __init__(self, store, callable):
188        self.store = store
189        self.callable = callable
190
191    def __call__(self, box):
192        return self.store.transact(self.callable, box)
193
194    def __repr__(self):
195        return '<Transaction in: %s of: %s>' % (self.store, self.callable)
196
197class DispatchMixin:
198    baseDispatchPrefix = 'juice_'
199    autoDispatchPrefix = 'command_'
200
201    wrapper = None
202
203    def _auto(self, aCallable, proto, namespace=None):
204        if aCallable is None:
205            return None
206        command = aCallable.command
207        if namespace not in command.namespaces:
208            # if you're in the wrong namespace, you are very likely not allowed
209            # to invoke the command you are trying to invoke.  some objects
210            # have commands exposed in a separate namespace for security
211            # reasons, since the security model is a role : namespace mapping.
212            log.msg('WRONG NAMESPACE: %r, %r' % (namespace, command.namespaces))
213            return None
214        def doit(box):
215            kw = stringsToObjects(box, command.arguments, proto)
216            for name, extraArg in command.extra:
217                kw[name] = extraArg.fromTransport(proto.transport)
218#             def checkIsDict(result):
219#                 if not isinstance(result, dict):
220#                     raise RuntimeError("%r returned %r, not dictionary" % (
221#                             aCallable, result))
222#                 return result
223            def checkKnownErrors(error):
224                key = error.trap(*command.allErrors)
225                code = command.allErrors[key]
226                desc = str(error.value)
227                return Failure(RemoteJuiceError(
228                        code, desc, error in command.fatalErrors))
229            return maybeDeferred(aCallable, **kw).addCallback(
230                command.makeResponse, proto).addErrback(
231                checkKnownErrors)
232        return doit
233
234    def _wrap(self, aCallable):
235        if aCallable is None:
236            return None
237        wrap = self.wrapper
238        if wrap is not None:
239            return wrap(aCallable)
240        else:
241            return aCallable
242
243    def normalizeCommand(self, cmd):
244        """Return the canonical form of a command.
245        """
246        return cmd.upper().strip().replace('-', '_')
247
248    def lookupFunction(self, proto, name, namespace):
249        """Return a callable to invoke when executing the named command.
250        """
251        # Try to find a method to be invoked in a transaction first
252        # Otherwise fallback to a "regular" method
253        fName = self.autoDispatchPrefix + name
254        fObj = getattr(self, fName, None)
255        if fObj is not None:
256            # pass the namespace along
257            return self._auto(fObj, proto, namespace)
258
259        assert namespace is None, 'Old-style parsing'
260        # Fall back to simplistic command dispatching - we probably want to get
261        # rid of this eventually, there's no reason to do extra work and write
262        # fewer docs all the time.
263        fName = self.baseDispatchPrefix + name
264        return getattr(self, fName, None)
265
266    def dispatchCommand(self, proto, cmd, box, namespace=None):
267        fObj = self.lookupFunction(proto, self.normalizeCommand(cmd), namespace)
268        if fObj is None:
269            return fail(UnhandledCommand(cmd))
270        return maybeDeferred(self._wrap(fObj), box)
271
272def normalizeKey(key):
273    lkey = six.ensure_str(key).lower().replace('-', '_')
274    if keyword.iskeyword(lkey):
275        return lkey.title()
276    return lkey
277
278
279def parseJuiceHeaders(lines):
280    """
281    Create a JuiceBox from a list of header lines.
282
283    @param lines: a list of lines.
284    @type lines: a list of L{bytes}
285    """
286    b = JuiceBox()
287    key = None
288    for L in lines:
289        if L[0:1] == b' ':
290            # continuation
291            assert key is not None
292            b[key] += six.ensure_str(b'\r\n' + L[1:])
293            continue
294        parts = L.split(b': ', 1)
295        if len(parts) != 2:
296            raise MalformedJuiceBox("Wrong number of parts: %r" % (L,))
297        key, value = parts
298        key = normalizeKey(key)
299        b[key] = six.ensure_str(value)
300    return int(b.pop(LENGTH, 0)), b
301
302class JuiceParserBase(DispatchMixin):
303
304    def __init__(self):
305        self._outstandingRequests = {}
306
307    def _puke(self, failure):
308        log.msg("Juice server or network failure "
309                "unhandled by client application:")
310        log.err(failure)
311        log.msg(
312            "Dropping connection!  "
313            "To avoid, add errbacks to ALL remote commands!")
314        if self.transport is not None:
315            self.transport.loseConnection()
316
317    _counter = long(0)
318
319    def _nextTag(self):
320        self._counter += 1
321        return '%x' % (self._counter,)
322
323    def failAllOutgoing(self, reason):
324        OR = self._outstandingRequests.items()
325        self._outstandingRequests = None # we can never send another request
326        for key, value in OR:
327            value.errback(reason)
328
329    def juiceBoxReceived(self, box):
330        if debug:
331            log.msg("Juice receive: %s" % pprint.pformat(dict(six.viewitems(box))))
332
333        if ANSWER in box:
334            question = self._outstandingRequests.pop(box[ANSWER])
335            question.addErrback(self._puke)
336            self._wrap(question.callback)(box)
337        elif ERROR in box:
338            question = self._outstandingRequests.pop(box[ERROR])
339            question.addErrback(self._puke)
340            self._wrap(question.errback)(
341                Failure(RemoteJuiceError(box[ERROR_CODE],
342                                         box[ERROR_DESCRIPTION])))
343        elif COMMAND in box:
344            cmd = box[COMMAND]
345            def sendAnswer(answerBox):
346                if ASK not in box:
347                    return
348                if self.transport is None:
349                    return
350                answerBox[ANSWER] = box[ASK]
351                answerBox.sendTo(self)
352            def sendError(error):
353                if ASK not in box:
354                    return error
355                if error.check(RemoteJuiceError):
356                    code = error.value.errorCode
357                    desc = error.value.description
358                    if error.value.fatal:
359                        errorBox = QuitBox()
360                    else:
361                        errorBox = JuiceBox()
362                else:
363                    errorBox = QuitBox()
364                    log.err(error) # here is where server-side logging happens
365                                   # if the error isn't handled
366                    code = 'UNHANDLED'
367                    desc = "Unhandled Remote System Exception "
368                errorBox[ERROR] = box[ASK]
369                errorBox[ERROR_DESCRIPTION] = desc
370                errorBox[ERROR_CODE] = code
371                if self.transport is not None:
372                    errorBox.sendTo(self)
373                return None # intentionally stop the error here: don't log the
374                            # traceback if it's handled, do log it (earlier) if
375                            # it isn't
376            self.dispatchCommand(self, cmd, box).addCallbacks(sendAnswer, sendError
377                                                              ).addErrback(self._puke)
378        else:
379            raise RuntimeError(
380                "Empty packet received over connection-oriented juice: %r" % (box,))
381
382    def sendBoxCommand(self, command, box, requiresAnswer=True):
383        """
384        Send a command across the wire with the given C{juice.Box}.
385
386        Returns a Deferred which fires with the response C{juice.Box} when it
387        is received, or fails with a C{juice.RemoteJuiceError} if an error is
388        received.
389
390        If the Deferred fails and the error is not handled by the caller of
391        this method, the failure will be logged and the connection dropped.
392        """
393        if self._outstandingRequests is None:
394            return fail(CONNECTION_LOST)
395        box[COMMAND] = command
396        tag = self._nextTag()
397        if requiresAnswer:
398            box[ASK] = tag
399            result = self._outstandingRequests[tag] = Deferred()
400        else:
401            result = None
402        box.sendTo(self)
403        return result
404
405
406
407
408
409
410class Argument:
411    optional = False
412
413    def __init__(self, optional=False):
414        self.optional = optional
415
416    def retrieve(self, d, name):
417        if self.optional:
418            value = d.get(name)
419            if value is not None:
420                del d[name]
421        else:
422            value = d.pop(name)
423        return value
424
425    def fromBox(self, name, strings, objects, proto):
426        st = self.retrieve(strings, name)
427        if self.optional and st is None:
428            objects[name] = None
429        else:
430            objects[name] = self.fromStringProto(st, proto)
431
432    def toBox(self, name, strings, objects, proto):
433        obj = self.retrieve(objects, name)
434        if self.optional and obj is None:
435            # strings[name] = None
436            return
437        else:
438            strings[name] = self.toStringProto(obj, proto)
439
440    def fromStringProto(self, inString, proto):
441        return self.fromString(inString)
442
443    def toStringProto(self, inObject, proto):
444        return self.toString(inObject)
445
446    def fromString(self, inString):
447        raise NotImplementedError()
448
449    def toString(self, inObject):
450        raise NotImplementedError()
451
452class JuiceList(Argument):
453    def __init__(self, subargs):
454        self.subargs = subargs
455
456    def fromStringProto(self, inString, proto):
457        boxes = parseString(six.ensure_binary(inString))
458        values = [stringsToObjects(box, self.subargs, proto)
459                  for box in boxes]
460        return values
461
462    def toStringProto(self, inObject, proto):
463        return b''.join([
464            objectsToStrings(objects, self.subargs, Box(), proto).serialize()
465            for objects in inObject
466        ])
467
468class ListOf(Argument):
469    def __init__(self, subarg, delimiter=', '):
470        self.subarg = subarg
471        self.delimiter = delimiter
472
473    def fromStringProto(self, inString, proto):
474        strings = inString.split(self.delimiter)
475        L = [self.subarg.fromStringProto(string, proto)
476             for string in strings]
477        return L
478
479    def toStringProto(self, inObject, proto):
480        L = []
481        for inSingle in inObject:
482            outString = self.subarg.toStringProto(inSingle, proto)
483            assert self.delimiter not in outString
484            L.append(outString)
485        return self.delimiter.join(L)
486
487class Integer(Argument):
488    fromString = int
489    def toString(self, inObject):
490        return str(int(inObject))
491
492class String(Argument):
493    def toString(self, inObject):
494        return inObject
495    def fromString(self, inString):
496        return inString
497
498class EncodedString(Argument):
499
500    def __init__(self, encoding):
501        self.encoding = encoding
502
503    def toString(self, inObject):
504        return inObject.encode(self.encoding)
505
506    def fromString(self, inString):
507        return inString.decode(self.encoding)
508
509# Temporary backwards compatibility for Exponent
510
511Body = String
512
513class Unicode(String):
514    def toString(self, inObject):
515        # assert isinstance(inObject, unicode)
516        return String.toString(self, inObject.encode('utf-8'))
517
518    def fromString(self, inString):
519        # assert isinstance(inString, str)
520        return String.fromString(self, inString).decode('utf-8')
521
522class Path(Unicode):
523    def fromString(self, inString):
524        return filepath.FilePath(Unicode.fromString(self, inString))
525
526    def toString(self, inObject):
527        return Unicode.toString(self, inObject.path)
528
529
530class Float(Argument):
531    fromString = float
532    toString = str
533
534class Base64Binary(Argument):
535    def toString(self, inObject):
536        return inObject.encode('base64').replace('\n', '')
537    def fromString(self, inString):
538        return inString.decode('base64')
539
540class Time(Argument):
541    def toString(self, inObject):
542        return inObject.asISO8601TimeAndDate()
543    def fromString(self, inString):
544        return extime.Time.fromISO8601TimeAndDate(inString)
545
546class ExtraArg:
547    def fromTransport(self, inTransport):
548        raise NotImplementedError()
549
550class Peer(ExtraArg):
551    def fromTransport(self, inTransport):
552        return inTransport.getQ2QPeer()
553
554class PeerDomain(ExtraArg):
555    def fromTransport(self, inTransport):
556        return inTransport.getQ2QPeer().domain
557
558class PeerUser(ExtraArg):
559    def fromTransport(self, inTransport):
560        return inTransport.getQ2QPeer().resource
561
562class Host(ExtraArg):
563    def fromTransport(self, inTransport):
564        return inTransport.getQ2QHost()
565
566class HostDomain(ExtraArg):
567    def fromTransport(self, inTransport):
568        return inTransport.getQ2QHost().domain
569
570class HostUser(ExtraArg):
571    def fromTransport(self, inTransport):
572        return inTransport.getQ2QHost().resource
573
574
575
576class Boolean(Argument):
577    def fromString(self, inString):
578        if inString == 'True':
579            return True
580        elif inString == 'False':
581            return False
582        else:
583            raise RuntimeError("Bad boolean value: %r" % (inString,))
584
585    def toString(self, inObject):
586        if inObject:
587            return 'True'
588        else:
589            return 'False'
590
591
592class _CommandMeta(type):
593        def __new__(cls, name, bases, attrs):
594            re = attrs['reverseErrors'] = {}
595            er = attrs['allErrors'] = {}
596            for v, k in six.viewitems(attrs.get('errors',{})):
597                re[k] = v
598                er[v] = k
599            for v, k in six.viewitems(attrs.get('fatalErrors',{})):
600                re[k] = v
601                er[v] = k
602            return type.__new__(cls, name, bases, attrs)
603
604
605@six.add_metaclass(_CommandMeta)
606class Command:
607    arguments = []
608    response = []
609    extra = []
610    namespaces = [None]         # This is set to [None] on purpose: None means
611                                # "no namespace", not "empty list".  "empty
612                                # list" will make your command invalid in _all_
613                                # namespaces, effectively uncallable.
614    errors = {}
615    fatalErrors = {}
616
617    commandType = Box
618    responseType = Box
619
620    def commandName():
621        def get(self):
622            return self.__class__.__name__
623            raise NotImplementedError("Missing command name")
624        return get,
625    commandName = property(*commandName())
626
627    def __init__(self, **kw):
628        self.structured = kw
629        givenArgs = [normalizeKey(k) for k in kw.keys()]
630        forgotten = []
631        for name, arg in self.arguments:
632            if normalizeKey(name) not in givenArgs and not arg.optional:
633                forgotten.append(normalizeKey(name))
634#         for v in kw.itervalues():
635#             if v is None:
636#                 from pprint import pformat
637#                 raise RuntimeError("ARGH: %s" % pformat(kw))
638        if forgotten:
639            if len(forgotten) == 1:
640                plural = 'an argument'
641            else:
642                plural = 'some arguments'
643            raise RuntimeError("You forgot %s to %r: %s" % (
644                    plural, self.commandName, ', '.join(forgotten)))
645        forgotten = []
646
647    def makeResponse(cls, objects, proto):
648        try:
649            return objectsToStrings(objects, cls.response, cls.responseType(), proto)
650        except:
651            log.msg("Exception in %r.makeResponse" % (cls,))
652            raise
653    makeResponse = classmethod(makeResponse)
654
655    def do(self, proto, namespace=None, requiresAnswer=True):
656        if namespace is not None:
657            cmd = namespace + ":" + self.commandName
658        else:
659            cmd = self.commandName
660        def _massageError(error):
661            error.trap(RemoteJuiceError)
662            rje = error.value
663            return Failure(self.reverseErrors.get(rje.errorCode, UnhandledRemoteJuiceError)(rje.description))
664
665        d = proto.sendBoxCommand(
666            cmd, objectsToStrings(self.structured, self.arguments, self.commandType(),
667                                  proto),
668            requiresAnswer)
669
670        if requiresAnswer:
671            d.addCallback(stringsToObjects, self.response, proto)
672            d.addCallback(self.addExtra, proto.transport)
673            d.addErrback(_massageError)
674
675        return d
676
677    def addExtra(self, d, transport):
678        for name, extraArg in self.extra:
679            d[name] = extraArg.fromTransport(transport)
680        return d
681
682
683class ProtocolSwitchCommand(Command):
684    """Use this command to switch from something Juice-derived to a different
685    protocol mid-connection.  This can be useful to use juice as the
686    connection-startup negotiation phase.  Since TLS is a different layer
687    entirely, you can use Juice to negotiate the security parameters of your
688    connection, then switch to a different protocol, and the connection will
689    remain secured.
690    """
691
692    def __init__(self, __protoToSwitchToFactory, **kw):
693        self.protoToSwitchToFactory = __protoToSwitchToFactory
694        super(ProtocolSwitchCommand, self).__init__(**kw)
695
696    def makeResponse(cls, innerProto, proto):
697        return _SwitchBox(innerProto)
698
699    makeResponse = classmethod(makeResponse)
700
701    def do(self, proto, namespace=None):
702        d = super(ProtocolSwitchCommand, self).do(proto)
703        proto._lock()
704        def switchNow(ign):
705            innerProto = self.protoToSwitchToFactory.buildProtocol(proto.transport.getPeer())
706            proto._switchTo(innerProto, self.protoToSwitchToFactory)
707            return ign
708        def die(ign):
709            proto.transport.loseConnection()
710            return ign
711        def handle(ign):
712            self.protoToSwitchToFactory.clientConnectionFailed(None, Failure(CONNECTION_LOST))
713            return ign
714        return d.addCallbacks(switchNow, handle).addErrback(die)
715
716class Negotiate(Command):
717    commandName = 'Negotiate'
718
719    arguments = [('versions', ListOf(Integer()))]
720    response = [('version', Integer())]
721
722    responseType = NegotiateBox
723
724
725class Juice(LineReceiver, JuiceParserBase, object):
726    """
727    JUICE (JUice Is Concurrent Events) is a simple connection-oriented
728    request/response protocol.  Packets, or "boxes", are collections of
729    RFC2822-inspired headers, plus a body.  Note that this is NOT a literal
730    interpretation of any existing RFC, 822, 2822 or otherwise, but a simpler
731    version that does not do line continuations, does not specify any
732    particular format for header values, dispatches semantic meanings of most
733    headers on the -Command header rather than giving them global meaning, and
734    allows multiple sets of headers (messages, or JuiceBoxes) on a connection.
735
736    All headers whose names begin with a dash ('-') are reserved for use by the
737    protocol.  All others are for application use - their meaning depends on
738    the value of the "-Command" header.
739    """
740
741    protocolName = b'juice-base'
742
743    hostCertificate = None
744
745    MAX_LENGTH = 1024 * 1024
746
747    isServer = property(lambda self: self._issueGreeting,
748                        doc="""
749                        True if this is a juice server, e.g. it is going to
750                        issue or has issued a server greeting upon
751                        connection.
752                        """)
753
754    isClient = property(lambda self: not self._issueGreeting,
755                        doc="""
756                        True if this is a juice server, e.g. it is not going to
757                        issue or did not issue a server greeting upon
758                        connection.
759                        """)
760
761    def __init__(self, issueGreeting):
762        """
763        @param issueGreeting: whether to issue a greeting when connected.  This
764        should be set on server-side Juice protocols.
765        """
766        JuiceParserBase.__init__(self)
767        self._issueGreeting = issueGreeting
768
769    def __repr__(self):
770        return '<%s %s/%s at 0x%x>' % (self.__class__.__name__, self.isClient and 'client' or 'server', self.innerProtocol, id(self))
771
772    __locked = False
773
774    def _lock(self):
775        """ Lock this Juice instance so that no further Juice traffic may be sent.
776        This is used when sending a request to switch underlying protocols.
777        You probably want to subclass ProtocolSwitchCommand rather than calling
778        this directly.
779        """
780        self.__locked = True
781
782    innerProtocol = None
783
784    def _switchTo(self, newProto, clientFactory=None):
785        """ Switch this Juice instance to a new protocol.  You need to do this
786        'simultaneously' on both ends of a connection; the easiest way to do
787        this is to use a subclass of ProtocolSwitchCommand.
788        """
789
790        assert self.innerProtocol is None, "Protocol can only be safely switched once."
791        self.setRawMode()
792        self.innerProtocol = newProto
793        self.innerProtocolClientFactory = clientFactory
794        newProto.makeConnection(self.transport)
795
796    innerProtocolClientFactory = None
797
798    def juiceBoxReceived(self, box):
799        if self.__locked and COMMAND in box and ASK in box:
800            # This is a command which will trigger an answer, and we can no
801            # longer answer anything, so don't bother delivering it.
802            return
803        return super(Juice, self).juiceBoxReceived(box)
804
805    def sendPacket(self, completeBox):
806        """
807        Send a juice.Box to my peer.
808
809        Note: transport.write is never called outside of this method.
810        """
811        assert not self.__locked, "You cannot send juice packets when a connection is locked"
812        if self._startingTLSBuffer is not None:
813            self._startingTLSBuffer.append(completeBox)
814        else:
815            if debug:
816                log.msg("Juice send: %s" % pprint.pformat(dict(six.viewitems(completeBox))))
817            result = completeBox.serialize()
818            self.transport.write(result)
819
820    def sendCommand(self, command, __content='', __answer=True, **kw):
821        box = JuiceBox(__content, **kw)
822        return self.sendBoxCommand(command, box, requiresAnswer=__answer)
823
824    _outstandingRequests = None
825    _justStartedTLS = False
826
827    def makeConnection(self, transport):
828        self._transportPeer = transport.getPeer()
829        self._transportHost = transport.getHost()
830        log.msg("%s %s connection established (HOST:%s PEER:%s)" % (self.isClient and "client" or "server",
831                                                                    self.__class__.__name__,
832                                                                    self._transportHost,
833                                                                    self._transportPeer))
834        self._outstandingRequests = {}
835        self._requestBuffer = []
836        LineReceiver.makeConnection(self, transport)
837
838    _startingTLSBuffer = None
839
840    def prepareTLS(self):
841        self._startingTLSBuffer = []
842
843    def startTLS(self, certificate, *verifyAuthorities):
844        if self.hostCertificate is None:
845            self.hostCertificate = certificate
846            self._justStartedTLS = True
847            self.transport.startTLS(certificate.options(*verifyAuthorities))
848            stlsb = self._startingTLSBuffer
849            if stlsb is not None:
850                self._startingTLSBuffer = None
851                for box in stlsb:
852                    self.sendPacket(box)
853        else:
854            raise RuntimeError(
855                "Previously authenticated connection between %s and %s "
856                "is trying to re-establish as %s" % (
857                    self.hostCertificate,
858                    Certificate.peerFromTransport(self.transport),
859                    (certificate, verifyAuthorities)))
860
861    def dataReceived(self, data):
862        # If we successfully receive any data after TLS has been started, that
863        # means the connection was secured properly.  Make a note of that fact.
864        if self._justStartedTLS:
865            self._justStartedTLS = False
866        return LineReceiver.dataReceived(self, data)
867
868    def connectionLost(self, reason):
869        log.msg("%s %s connection lost (HOST:%s PEER:%s)" % (
870                self.isClient and 'client' or 'server',
871                self.__class__.__name__,
872                self._transportHost,
873                self._transportPeer))
874        self.failAllOutgoing(reason)
875        if self.innerProtocol is not None:
876            self.innerProtocol.connectionLost(reason)
877            if self.innerProtocolClientFactory is not None:
878                self.innerProtocolClientFactory.clientConnectionLost(None, reason)
879
880    def lineReceived(self, line):
881        if line:
882            self._requestBuffer.append(line)
883        else:
884            buf = self._requestBuffer
885            self._requestBuffer = []
886            bodylen, b = parseJuiceHeaders(buf)
887            if bodylen:
888                self._bodyRemaining = bodylen
889                self._bodyBuffer = []
890                self._pendingBox = b
891                self.setRawMode()
892            else:
893                self.juiceBoxReceived(b)
894
895    def rawDataReceived(self, data):
896        if self.innerProtocol is not None:
897            self.innerProtocol.dataReceived(data)
898            return
899        self._bodyRemaining -= len(data)
900        if self._bodyRemaining <= 0:
901            if self._bodyRemaining < 0:
902                self._bodyBuffer.append(data[:self._bodyRemaining])
903                extraData = data[self._bodyRemaining:]
904            else:
905                self._bodyBuffer.append(data)
906                extraData = ''
907            self._pendingBox['body'] = six.ensure_str(b''.join(six.ensure_binary(each) for each in self._bodyBuffer))
908            self._bodyBuffer = None
909            b, self._pendingBox = self._pendingBox, None
910            self.juiceBoxReceived(b)
911            if self.innerProtocol is not None:
912                self.innerProtocol.makeConnection(self.transport)
913                if extraData:
914                    self.innerProtocol.dataReceived(extraData)
915            else:
916                self.setLineMode(extraData)
917        else:
918            self._bodyBuffer.append(data)
919
920    protocolVersion = 0
921
922    def _setProtocolVersion(self, version):
923        # if we ever want to actually mangle encodings, this is the place to do
924        # it!
925        self.protocolVersion = version
926        return version
927
928    def renegotiateVersion(self, newVersion):
929        assert newVersion in VERSIONS, (
930            "This side of the connection doesn't support version %r"
931            % (newVersion,))
932        v = VERSIONS[:]
933        v.remove(newVersion)
934        return Negotiate(versions=[newVersion]).do(self).addCallback(
935            lambda ver: self._setProtocolVersion(ver['version']))
936
937    def command_NEGOTIATE(self, versions):
938        for version in versions:
939            if version in VERSIONS:
940                return dict(version=version)
941        raise IncompatibleVersions()
942    command_NEGOTIATE.command = Negotiate
943
944
945VERSIONS = [1]
946
947class _ParserHelper(Juice):
948    def __init__(self):
949        Juice.__init__(self, False)
950        self.boxes = []
951        self.results = Deferred()
952
953    def getPeer(self):
954        return 'string'
955
956    def getHost(self):
957        return 'string'
958
959    disconnecting = False
960
961    def juiceBoxReceived(self, box):
962        self.boxes.append(box)
963
964    # Synchronous helpers
965    def parse(cls, fileObj):
966        p = cls()
967        p.makeConnection(p)
968        p.dataReceived(fileObj.read())
969        return p.boxes
970    parse = classmethod(parse)
971
972    def parseString(cls, data):
973        with io.BytesIO(data) as f:
974            return cls.parse(f)
975    parseString = classmethod(parseString)
976
977parse = _ParserHelper.parse
978parseString = _ParserHelper.parseString
979
980def stringsToObjects(strings, arglist, proto):
981    objects = {}
982    myStrings = strings.copy()
983    for argname, argparser in arglist:
984        argparser.fromBox(argname, myStrings, objects, proto)
985    return objects
986
987def objectsToStrings(objects, arglist, strings, proto):
988    myObjects = {}
989    for (k, v) in objects.items():
990        myObjects[normalizeKey(k)] = v
991
992    for argname, argparser in arglist:
993        argparser.toBox(argname, strings, myObjects, proto)
994    return strings
995
996class JuiceServerFactory(ServerFactory):
997    protocol = Juice
998    def buildProtocol(self, addr):
999        prot = self.protocol(True)
1000        prot.factory = self
1001        return prot
1002
1003class JuiceClientFactory(ClientFactory):
1004    protocol = Juice
1005    def buildProtocol(self, addr):
1006        prot = self.protocol(False)
1007        prot.factory = self
1008        return prot
1009
1010