1# -*- test-case-name: epsilon.test.test_juice -*- 2# Copyright 2005 Divmod, Inc. See LICENSE file for details 3 4import warnings, pprint 5import keyword 6import io 7 8import six 9from twisted.internet.main import CONNECTION_LOST 10from twisted.internet.defer import Deferred, maybeDeferred, fail 11from twisted.internet.protocol import ServerFactory, ClientFactory 12from twisted.internet.ssl import Certificate 13from twisted.python.failure import Failure 14from twisted.python import log, filepath 15 16from epsilon.liner import LineReceiver 17from epsilon.compat import long 18from epsilon import extime 19 20ASK = '_ask' 21ANSWER = '_answer' 22COMMAND = '_command' 23ERROR = '_error' 24ERROR_CODE = '_error_code' 25ERROR_DESCRIPTION = '_error_description' 26LENGTH = '_length' 27BODY = 'body' 28 29debug = False 30 31class JuiceBox(dict): 32 """ I am a packet in the JUICE protocol. """ 33 34 def __init__(self, __body='', **kw): 35 self.update(kw) 36 if __body: 37 assert isinstance(__body, str), "body must be a string: %r" % ( repr(__body),) 38 self['body'] = __body 39 40 def body(): 41 def get(self): 42 warnings.warn("body attribute of boxes is now just a regular field", 43 stacklevel=2) 44 return self['body'] 45 def set(self, newbody): 46 warnings.warn("body attribute of boxes is now just a regular field", 47 stacklevel=2) 48 self['body'] = newbody 49 return get,set 50 body = property(*body()) 51 52 def copy(self): 53 newBox = self.__class__() 54 newBox.update(self) 55 return newBox 56 57 def serialize(self, 58 delimiter=b'\r\n', 59 escaped=b'\r\n '): 60 assert LENGTH not in self 61 delimiter = six.ensure_binary(delimiter) 62 escaped = six.ensure_binary(escaped) 63 64 L = [] 65 for (k, v) in six.viewitems(self): 66 if k == BODY: 67 k = LENGTH 68 v = str(len(self[BODY])) 69 L.append(six.ensure_binary(k).replace(b'_', b'-').title()) 70 L.append(b': ') 71 L.append(six.ensure_binary(v).replace(delimiter, escaped)) 72 L.append(delimiter) 73 74 L.append(delimiter) 75 if BODY in self: 76 L.append(six.ensure_binary(self[BODY])) 77 return b''.join(L) 78 79 def sendTo(self, proto): 80 """ 81 Serialize and send this box to a Juice instance. By the time it is 82 being sent, several keys are required. I must have exactly ONE of:: 83 84 -ask 85 -answer 86 -error 87 88 If the '-ask' header is set, then the '-command' header must also be 89 set. 90 """ 91 proto.sendPacket(self) 92 93# juice.Box => JuiceBox 94 95Box = JuiceBox 96 97class TLSBox(JuiceBox): 98 def __repr__(self): 99 return 'TLS(**%s)' % (super(TLSBox, self).__repr__(),) 100 101 102 def __init__(self, __certificate, __verify=None, __sslstarted=None, **kw): 103 super(TLSBox, self).__init__(**kw) 104 self.certificate = __certificate 105 self.verify = __verify 106 self.sslstarted = __sslstarted 107 108 def sendTo(self, proto): 109 super(TLSBox, self).sendTo(proto) 110 if self.verify is None: 111 proto.startTLS(self.certificate) 112 else: 113 proto.startTLS(self.certificate, self.verify) 114 if self.sslstarted is not None: 115 self.sslstarted() 116 117class QuitBox(JuiceBox): 118 def __repr__(self): 119 return 'Quit(**%s)' % (super(QuitBox, self).__repr__(),) 120 121 122 def sendTo(self, proto): 123 super(QuitBox, self).sendTo(proto) 124 proto.transport.loseConnection() 125 126class _SwitchBox(JuiceBox): 127 def __repr__(self): 128 return 'Switch(**%s)' % (super(_SwitchBox, self).__repr__(),) 129 130 131 def __init__(self, __proto, **kw): 132 super(_SwitchBox, self).__init__(**kw) 133 self.innerProto = __proto 134 135 def sendTo(self, proto): 136 super(_SwitchBox, self).sendTo(proto) 137 proto._switchTo(self.innerProto) 138 139 140 141class NegotiateBox(JuiceBox): 142 def __repr__(self): 143 return 'Negotiate(**%s)' % (super(NegotiateBox, self).__repr__(),) 144 145 146 def sendTo(self, proto): 147 super(NegotiateBox, self).sendTo(proto) 148 proto._setProtocolVersion(int(self['version'])) 149 150 151 152class JuiceError(Exception): 153 pass 154 155class RemoteJuiceError(JuiceError): 156 """ 157 This error indicates that something went wrong on the remote end of the 158 connection, and the error was serialized and transmitted to you. 159 """ 160 def __init__(self, errorCode, description, fatal=False): 161 """Create a remote error with an error code and description. 162 """ 163 Exception.__init__(self, "Remote[%s]: %s" % (errorCode, description)) 164 self.errorCode = errorCode 165 self.description = description 166 self.fatal = fatal 167 168class UnhandledRemoteJuiceError(RemoteJuiceError): 169 def __init__(self, description): 170 errorCode = b"UNHANDLED" 171 RemoteJuiceError.__init__(self, errorCode, description) 172 173class JuiceBoxError(JuiceError): 174 pass 175 176class MalformedJuiceBox(JuiceBoxError): 177 pass 178 179class UnhandledCommand(JuiceError): 180 pass 181 182 183class IncompatibleVersions(JuiceError): 184 pass 185 186class _Transactor: 187 def __init__(self, store, callable): 188 self.store = store 189 self.callable = callable 190 191 def __call__(self, box): 192 return self.store.transact(self.callable, box) 193 194 def __repr__(self): 195 return '<Transaction in: %s of: %s>' % (self.store, self.callable) 196 197class DispatchMixin: 198 baseDispatchPrefix = 'juice_' 199 autoDispatchPrefix = 'command_' 200 201 wrapper = None 202 203 def _auto(self, aCallable, proto, namespace=None): 204 if aCallable is None: 205 return None 206 command = aCallable.command 207 if namespace not in command.namespaces: 208 # if you're in the wrong namespace, you are very likely not allowed 209 # to invoke the command you are trying to invoke. some objects 210 # have commands exposed in a separate namespace for security 211 # reasons, since the security model is a role : namespace mapping. 212 log.msg('WRONG NAMESPACE: %r, %r' % (namespace, command.namespaces)) 213 return None 214 def doit(box): 215 kw = stringsToObjects(box, command.arguments, proto) 216 for name, extraArg in command.extra: 217 kw[name] = extraArg.fromTransport(proto.transport) 218# def checkIsDict(result): 219# if not isinstance(result, dict): 220# raise RuntimeError("%r returned %r, not dictionary" % ( 221# aCallable, result)) 222# return result 223 def checkKnownErrors(error): 224 key = error.trap(*command.allErrors) 225 code = command.allErrors[key] 226 desc = str(error.value) 227 return Failure(RemoteJuiceError( 228 code, desc, error in command.fatalErrors)) 229 return maybeDeferred(aCallable, **kw).addCallback( 230 command.makeResponse, proto).addErrback( 231 checkKnownErrors) 232 return doit 233 234 def _wrap(self, aCallable): 235 if aCallable is None: 236 return None 237 wrap = self.wrapper 238 if wrap is not None: 239 return wrap(aCallable) 240 else: 241 return aCallable 242 243 def normalizeCommand(self, cmd): 244 """Return the canonical form of a command. 245 """ 246 return cmd.upper().strip().replace('-', '_') 247 248 def lookupFunction(self, proto, name, namespace): 249 """Return a callable to invoke when executing the named command. 250 """ 251 # Try to find a method to be invoked in a transaction first 252 # Otherwise fallback to a "regular" method 253 fName = self.autoDispatchPrefix + name 254 fObj = getattr(self, fName, None) 255 if fObj is not None: 256 # pass the namespace along 257 return self._auto(fObj, proto, namespace) 258 259 assert namespace is None, 'Old-style parsing' 260 # Fall back to simplistic command dispatching - we probably want to get 261 # rid of this eventually, there's no reason to do extra work and write 262 # fewer docs all the time. 263 fName = self.baseDispatchPrefix + name 264 return getattr(self, fName, None) 265 266 def dispatchCommand(self, proto, cmd, box, namespace=None): 267 fObj = self.lookupFunction(proto, self.normalizeCommand(cmd), namespace) 268 if fObj is None: 269 return fail(UnhandledCommand(cmd)) 270 return maybeDeferred(self._wrap(fObj), box) 271 272def normalizeKey(key): 273 lkey = six.ensure_str(key).lower().replace('-', '_') 274 if keyword.iskeyword(lkey): 275 return lkey.title() 276 return lkey 277 278 279def parseJuiceHeaders(lines): 280 """ 281 Create a JuiceBox from a list of header lines. 282 283 @param lines: a list of lines. 284 @type lines: a list of L{bytes} 285 """ 286 b = JuiceBox() 287 key = None 288 for L in lines: 289 if L[0:1] == b' ': 290 # continuation 291 assert key is not None 292 b[key] += six.ensure_str(b'\r\n' + L[1:]) 293 continue 294 parts = L.split(b': ', 1) 295 if len(parts) != 2: 296 raise MalformedJuiceBox("Wrong number of parts: %r" % (L,)) 297 key, value = parts 298 key = normalizeKey(key) 299 b[key] = six.ensure_str(value) 300 return int(b.pop(LENGTH, 0)), b 301 302class JuiceParserBase(DispatchMixin): 303 304 def __init__(self): 305 self._outstandingRequests = {} 306 307 def _puke(self, failure): 308 log.msg("Juice server or network failure " 309 "unhandled by client application:") 310 log.err(failure) 311 log.msg( 312 "Dropping connection! " 313 "To avoid, add errbacks to ALL remote commands!") 314 if self.transport is not None: 315 self.transport.loseConnection() 316 317 _counter = long(0) 318 319 def _nextTag(self): 320 self._counter += 1 321 return '%x' % (self._counter,) 322 323 def failAllOutgoing(self, reason): 324 OR = self._outstandingRequests.items() 325 self._outstandingRequests = None # we can never send another request 326 for key, value in OR: 327 value.errback(reason) 328 329 def juiceBoxReceived(self, box): 330 if debug: 331 log.msg("Juice receive: %s" % pprint.pformat(dict(six.viewitems(box)))) 332 333 if ANSWER in box: 334 question = self._outstandingRequests.pop(box[ANSWER]) 335 question.addErrback(self._puke) 336 self._wrap(question.callback)(box) 337 elif ERROR in box: 338 question = self._outstandingRequests.pop(box[ERROR]) 339 question.addErrback(self._puke) 340 self._wrap(question.errback)( 341 Failure(RemoteJuiceError(box[ERROR_CODE], 342 box[ERROR_DESCRIPTION]))) 343 elif COMMAND in box: 344 cmd = box[COMMAND] 345 def sendAnswer(answerBox): 346 if ASK not in box: 347 return 348 if self.transport is None: 349 return 350 answerBox[ANSWER] = box[ASK] 351 answerBox.sendTo(self) 352 def sendError(error): 353 if ASK not in box: 354 return error 355 if error.check(RemoteJuiceError): 356 code = error.value.errorCode 357 desc = error.value.description 358 if error.value.fatal: 359 errorBox = QuitBox() 360 else: 361 errorBox = JuiceBox() 362 else: 363 errorBox = QuitBox() 364 log.err(error) # here is where server-side logging happens 365 # if the error isn't handled 366 code = 'UNHANDLED' 367 desc = "Unhandled Remote System Exception " 368 errorBox[ERROR] = box[ASK] 369 errorBox[ERROR_DESCRIPTION] = desc 370 errorBox[ERROR_CODE] = code 371 if self.transport is not None: 372 errorBox.sendTo(self) 373 return None # intentionally stop the error here: don't log the 374 # traceback if it's handled, do log it (earlier) if 375 # it isn't 376 self.dispatchCommand(self, cmd, box).addCallbacks(sendAnswer, sendError 377 ).addErrback(self._puke) 378 else: 379 raise RuntimeError( 380 "Empty packet received over connection-oriented juice: %r" % (box,)) 381 382 def sendBoxCommand(self, command, box, requiresAnswer=True): 383 """ 384 Send a command across the wire with the given C{juice.Box}. 385 386 Returns a Deferred which fires with the response C{juice.Box} when it 387 is received, or fails with a C{juice.RemoteJuiceError} if an error is 388 received. 389 390 If the Deferred fails and the error is not handled by the caller of 391 this method, the failure will be logged and the connection dropped. 392 """ 393 if self._outstandingRequests is None: 394 return fail(CONNECTION_LOST) 395 box[COMMAND] = command 396 tag = self._nextTag() 397 if requiresAnswer: 398 box[ASK] = tag 399 result = self._outstandingRequests[tag] = Deferred() 400 else: 401 result = None 402 box.sendTo(self) 403 return result 404 405 406 407 408 409 410class Argument: 411 optional = False 412 413 def __init__(self, optional=False): 414 self.optional = optional 415 416 def retrieve(self, d, name): 417 if self.optional: 418 value = d.get(name) 419 if value is not None: 420 del d[name] 421 else: 422 value = d.pop(name) 423 return value 424 425 def fromBox(self, name, strings, objects, proto): 426 st = self.retrieve(strings, name) 427 if self.optional and st is None: 428 objects[name] = None 429 else: 430 objects[name] = self.fromStringProto(st, proto) 431 432 def toBox(self, name, strings, objects, proto): 433 obj = self.retrieve(objects, name) 434 if self.optional and obj is None: 435 # strings[name] = None 436 return 437 else: 438 strings[name] = self.toStringProto(obj, proto) 439 440 def fromStringProto(self, inString, proto): 441 return self.fromString(inString) 442 443 def toStringProto(self, inObject, proto): 444 return self.toString(inObject) 445 446 def fromString(self, inString): 447 raise NotImplementedError() 448 449 def toString(self, inObject): 450 raise NotImplementedError() 451 452class JuiceList(Argument): 453 def __init__(self, subargs): 454 self.subargs = subargs 455 456 def fromStringProto(self, inString, proto): 457 boxes = parseString(six.ensure_binary(inString)) 458 values = [stringsToObjects(box, self.subargs, proto) 459 for box in boxes] 460 return values 461 462 def toStringProto(self, inObject, proto): 463 return b''.join([ 464 objectsToStrings(objects, self.subargs, Box(), proto).serialize() 465 for objects in inObject 466 ]) 467 468class ListOf(Argument): 469 def __init__(self, subarg, delimiter=', '): 470 self.subarg = subarg 471 self.delimiter = delimiter 472 473 def fromStringProto(self, inString, proto): 474 strings = inString.split(self.delimiter) 475 L = [self.subarg.fromStringProto(string, proto) 476 for string in strings] 477 return L 478 479 def toStringProto(self, inObject, proto): 480 L = [] 481 for inSingle in inObject: 482 outString = self.subarg.toStringProto(inSingle, proto) 483 assert self.delimiter not in outString 484 L.append(outString) 485 return self.delimiter.join(L) 486 487class Integer(Argument): 488 fromString = int 489 def toString(self, inObject): 490 return str(int(inObject)) 491 492class String(Argument): 493 def toString(self, inObject): 494 return inObject 495 def fromString(self, inString): 496 return inString 497 498class EncodedString(Argument): 499 500 def __init__(self, encoding): 501 self.encoding = encoding 502 503 def toString(self, inObject): 504 return inObject.encode(self.encoding) 505 506 def fromString(self, inString): 507 return inString.decode(self.encoding) 508 509# Temporary backwards compatibility for Exponent 510 511Body = String 512 513class Unicode(String): 514 def toString(self, inObject): 515 # assert isinstance(inObject, unicode) 516 return String.toString(self, inObject.encode('utf-8')) 517 518 def fromString(self, inString): 519 # assert isinstance(inString, str) 520 return String.fromString(self, inString).decode('utf-8') 521 522class Path(Unicode): 523 def fromString(self, inString): 524 return filepath.FilePath(Unicode.fromString(self, inString)) 525 526 def toString(self, inObject): 527 return Unicode.toString(self, inObject.path) 528 529 530class Float(Argument): 531 fromString = float 532 toString = str 533 534class Base64Binary(Argument): 535 def toString(self, inObject): 536 return inObject.encode('base64').replace('\n', '') 537 def fromString(self, inString): 538 return inString.decode('base64') 539 540class Time(Argument): 541 def toString(self, inObject): 542 return inObject.asISO8601TimeAndDate() 543 def fromString(self, inString): 544 return extime.Time.fromISO8601TimeAndDate(inString) 545 546class ExtraArg: 547 def fromTransport(self, inTransport): 548 raise NotImplementedError() 549 550class Peer(ExtraArg): 551 def fromTransport(self, inTransport): 552 return inTransport.getQ2QPeer() 553 554class PeerDomain(ExtraArg): 555 def fromTransport(self, inTransport): 556 return inTransport.getQ2QPeer().domain 557 558class PeerUser(ExtraArg): 559 def fromTransport(self, inTransport): 560 return inTransport.getQ2QPeer().resource 561 562class Host(ExtraArg): 563 def fromTransport(self, inTransport): 564 return inTransport.getQ2QHost() 565 566class HostDomain(ExtraArg): 567 def fromTransport(self, inTransport): 568 return inTransport.getQ2QHost().domain 569 570class HostUser(ExtraArg): 571 def fromTransport(self, inTransport): 572 return inTransport.getQ2QHost().resource 573 574 575 576class Boolean(Argument): 577 def fromString(self, inString): 578 if inString == 'True': 579 return True 580 elif inString == 'False': 581 return False 582 else: 583 raise RuntimeError("Bad boolean value: %r" % (inString,)) 584 585 def toString(self, inObject): 586 if inObject: 587 return 'True' 588 else: 589 return 'False' 590 591 592class _CommandMeta(type): 593 def __new__(cls, name, bases, attrs): 594 re = attrs['reverseErrors'] = {} 595 er = attrs['allErrors'] = {} 596 for v, k in six.viewitems(attrs.get('errors',{})): 597 re[k] = v 598 er[v] = k 599 for v, k in six.viewitems(attrs.get('fatalErrors',{})): 600 re[k] = v 601 er[v] = k 602 return type.__new__(cls, name, bases, attrs) 603 604 605@six.add_metaclass(_CommandMeta) 606class Command: 607 arguments = [] 608 response = [] 609 extra = [] 610 namespaces = [None] # This is set to [None] on purpose: None means 611 # "no namespace", not "empty list". "empty 612 # list" will make your command invalid in _all_ 613 # namespaces, effectively uncallable. 614 errors = {} 615 fatalErrors = {} 616 617 commandType = Box 618 responseType = Box 619 620 def commandName(): 621 def get(self): 622 return self.__class__.__name__ 623 raise NotImplementedError("Missing command name") 624 return get, 625 commandName = property(*commandName()) 626 627 def __init__(self, **kw): 628 self.structured = kw 629 givenArgs = [normalizeKey(k) for k in kw.keys()] 630 forgotten = [] 631 for name, arg in self.arguments: 632 if normalizeKey(name) not in givenArgs and not arg.optional: 633 forgotten.append(normalizeKey(name)) 634# for v in kw.itervalues(): 635# if v is None: 636# from pprint import pformat 637# raise RuntimeError("ARGH: %s" % pformat(kw)) 638 if forgotten: 639 if len(forgotten) == 1: 640 plural = 'an argument' 641 else: 642 plural = 'some arguments' 643 raise RuntimeError("You forgot %s to %r: %s" % ( 644 plural, self.commandName, ', '.join(forgotten))) 645 forgotten = [] 646 647 def makeResponse(cls, objects, proto): 648 try: 649 return objectsToStrings(objects, cls.response, cls.responseType(), proto) 650 except: 651 log.msg("Exception in %r.makeResponse" % (cls,)) 652 raise 653 makeResponse = classmethod(makeResponse) 654 655 def do(self, proto, namespace=None, requiresAnswer=True): 656 if namespace is not None: 657 cmd = namespace + ":" + self.commandName 658 else: 659 cmd = self.commandName 660 def _massageError(error): 661 error.trap(RemoteJuiceError) 662 rje = error.value 663 return Failure(self.reverseErrors.get(rje.errorCode, UnhandledRemoteJuiceError)(rje.description)) 664 665 d = proto.sendBoxCommand( 666 cmd, objectsToStrings(self.structured, self.arguments, self.commandType(), 667 proto), 668 requiresAnswer) 669 670 if requiresAnswer: 671 d.addCallback(stringsToObjects, self.response, proto) 672 d.addCallback(self.addExtra, proto.transport) 673 d.addErrback(_massageError) 674 675 return d 676 677 def addExtra(self, d, transport): 678 for name, extraArg in self.extra: 679 d[name] = extraArg.fromTransport(transport) 680 return d 681 682 683class ProtocolSwitchCommand(Command): 684 """Use this command to switch from something Juice-derived to a different 685 protocol mid-connection. This can be useful to use juice as the 686 connection-startup negotiation phase. Since TLS is a different layer 687 entirely, you can use Juice to negotiate the security parameters of your 688 connection, then switch to a different protocol, and the connection will 689 remain secured. 690 """ 691 692 def __init__(self, __protoToSwitchToFactory, **kw): 693 self.protoToSwitchToFactory = __protoToSwitchToFactory 694 super(ProtocolSwitchCommand, self).__init__(**kw) 695 696 def makeResponse(cls, innerProto, proto): 697 return _SwitchBox(innerProto) 698 699 makeResponse = classmethod(makeResponse) 700 701 def do(self, proto, namespace=None): 702 d = super(ProtocolSwitchCommand, self).do(proto) 703 proto._lock() 704 def switchNow(ign): 705 innerProto = self.protoToSwitchToFactory.buildProtocol(proto.transport.getPeer()) 706 proto._switchTo(innerProto, self.protoToSwitchToFactory) 707 return ign 708 def die(ign): 709 proto.transport.loseConnection() 710 return ign 711 def handle(ign): 712 self.protoToSwitchToFactory.clientConnectionFailed(None, Failure(CONNECTION_LOST)) 713 return ign 714 return d.addCallbacks(switchNow, handle).addErrback(die) 715 716class Negotiate(Command): 717 commandName = 'Negotiate' 718 719 arguments = [('versions', ListOf(Integer()))] 720 response = [('version', Integer())] 721 722 responseType = NegotiateBox 723 724 725class Juice(LineReceiver, JuiceParserBase, object): 726 """ 727 JUICE (JUice Is Concurrent Events) is a simple connection-oriented 728 request/response protocol. Packets, or "boxes", are collections of 729 RFC2822-inspired headers, plus a body. Note that this is NOT a literal 730 interpretation of any existing RFC, 822, 2822 or otherwise, but a simpler 731 version that does not do line continuations, does not specify any 732 particular format for header values, dispatches semantic meanings of most 733 headers on the -Command header rather than giving them global meaning, and 734 allows multiple sets of headers (messages, or JuiceBoxes) on a connection. 735 736 All headers whose names begin with a dash ('-') are reserved for use by the 737 protocol. All others are for application use - their meaning depends on 738 the value of the "-Command" header. 739 """ 740 741 protocolName = b'juice-base' 742 743 hostCertificate = None 744 745 MAX_LENGTH = 1024 * 1024 746 747 isServer = property(lambda self: self._issueGreeting, 748 doc=""" 749 True if this is a juice server, e.g. it is going to 750 issue or has issued a server greeting upon 751 connection. 752 """) 753 754 isClient = property(lambda self: not self._issueGreeting, 755 doc=""" 756 True if this is a juice server, e.g. it is not going to 757 issue or did not issue a server greeting upon 758 connection. 759 """) 760 761 def __init__(self, issueGreeting): 762 """ 763 @param issueGreeting: whether to issue a greeting when connected. This 764 should be set on server-side Juice protocols. 765 """ 766 JuiceParserBase.__init__(self) 767 self._issueGreeting = issueGreeting 768 769 def __repr__(self): 770 return '<%s %s/%s at 0x%x>' % (self.__class__.__name__, self.isClient and 'client' or 'server', self.innerProtocol, id(self)) 771 772 __locked = False 773 774 def _lock(self): 775 """ Lock this Juice instance so that no further Juice traffic may be sent. 776 This is used when sending a request to switch underlying protocols. 777 You probably want to subclass ProtocolSwitchCommand rather than calling 778 this directly. 779 """ 780 self.__locked = True 781 782 innerProtocol = None 783 784 def _switchTo(self, newProto, clientFactory=None): 785 """ Switch this Juice instance to a new protocol. You need to do this 786 'simultaneously' on both ends of a connection; the easiest way to do 787 this is to use a subclass of ProtocolSwitchCommand. 788 """ 789 790 assert self.innerProtocol is None, "Protocol can only be safely switched once." 791 self.setRawMode() 792 self.innerProtocol = newProto 793 self.innerProtocolClientFactory = clientFactory 794 newProto.makeConnection(self.transport) 795 796 innerProtocolClientFactory = None 797 798 def juiceBoxReceived(self, box): 799 if self.__locked and COMMAND in box and ASK in box: 800 # This is a command which will trigger an answer, and we can no 801 # longer answer anything, so don't bother delivering it. 802 return 803 return super(Juice, self).juiceBoxReceived(box) 804 805 def sendPacket(self, completeBox): 806 """ 807 Send a juice.Box to my peer. 808 809 Note: transport.write is never called outside of this method. 810 """ 811 assert not self.__locked, "You cannot send juice packets when a connection is locked" 812 if self._startingTLSBuffer is not None: 813 self._startingTLSBuffer.append(completeBox) 814 else: 815 if debug: 816 log.msg("Juice send: %s" % pprint.pformat(dict(six.viewitems(completeBox)))) 817 result = completeBox.serialize() 818 self.transport.write(result) 819 820 def sendCommand(self, command, __content='', __answer=True, **kw): 821 box = JuiceBox(__content, **kw) 822 return self.sendBoxCommand(command, box, requiresAnswer=__answer) 823 824 _outstandingRequests = None 825 _justStartedTLS = False 826 827 def makeConnection(self, transport): 828 self._transportPeer = transport.getPeer() 829 self._transportHost = transport.getHost() 830 log.msg("%s %s connection established (HOST:%s PEER:%s)" % (self.isClient and "client" or "server", 831 self.__class__.__name__, 832 self._transportHost, 833 self._transportPeer)) 834 self._outstandingRequests = {} 835 self._requestBuffer = [] 836 LineReceiver.makeConnection(self, transport) 837 838 _startingTLSBuffer = None 839 840 def prepareTLS(self): 841 self._startingTLSBuffer = [] 842 843 def startTLS(self, certificate, *verifyAuthorities): 844 if self.hostCertificate is None: 845 self.hostCertificate = certificate 846 self._justStartedTLS = True 847 self.transport.startTLS(certificate.options(*verifyAuthorities)) 848 stlsb = self._startingTLSBuffer 849 if stlsb is not None: 850 self._startingTLSBuffer = None 851 for box in stlsb: 852 self.sendPacket(box) 853 else: 854 raise RuntimeError( 855 "Previously authenticated connection between %s and %s " 856 "is trying to re-establish as %s" % ( 857 self.hostCertificate, 858 Certificate.peerFromTransport(self.transport), 859 (certificate, verifyAuthorities))) 860 861 def dataReceived(self, data): 862 # If we successfully receive any data after TLS has been started, that 863 # means the connection was secured properly. Make a note of that fact. 864 if self._justStartedTLS: 865 self._justStartedTLS = False 866 return LineReceiver.dataReceived(self, data) 867 868 def connectionLost(self, reason): 869 log.msg("%s %s connection lost (HOST:%s PEER:%s)" % ( 870 self.isClient and 'client' or 'server', 871 self.__class__.__name__, 872 self._transportHost, 873 self._transportPeer)) 874 self.failAllOutgoing(reason) 875 if self.innerProtocol is not None: 876 self.innerProtocol.connectionLost(reason) 877 if self.innerProtocolClientFactory is not None: 878 self.innerProtocolClientFactory.clientConnectionLost(None, reason) 879 880 def lineReceived(self, line): 881 if line: 882 self._requestBuffer.append(line) 883 else: 884 buf = self._requestBuffer 885 self._requestBuffer = [] 886 bodylen, b = parseJuiceHeaders(buf) 887 if bodylen: 888 self._bodyRemaining = bodylen 889 self._bodyBuffer = [] 890 self._pendingBox = b 891 self.setRawMode() 892 else: 893 self.juiceBoxReceived(b) 894 895 def rawDataReceived(self, data): 896 if self.innerProtocol is not None: 897 self.innerProtocol.dataReceived(data) 898 return 899 self._bodyRemaining -= len(data) 900 if self._bodyRemaining <= 0: 901 if self._bodyRemaining < 0: 902 self._bodyBuffer.append(data[:self._bodyRemaining]) 903 extraData = data[self._bodyRemaining:] 904 else: 905 self._bodyBuffer.append(data) 906 extraData = '' 907 self._pendingBox['body'] = six.ensure_str(b''.join(six.ensure_binary(each) for each in self._bodyBuffer)) 908 self._bodyBuffer = None 909 b, self._pendingBox = self._pendingBox, None 910 self.juiceBoxReceived(b) 911 if self.innerProtocol is not None: 912 self.innerProtocol.makeConnection(self.transport) 913 if extraData: 914 self.innerProtocol.dataReceived(extraData) 915 else: 916 self.setLineMode(extraData) 917 else: 918 self._bodyBuffer.append(data) 919 920 protocolVersion = 0 921 922 def _setProtocolVersion(self, version): 923 # if we ever want to actually mangle encodings, this is the place to do 924 # it! 925 self.protocolVersion = version 926 return version 927 928 def renegotiateVersion(self, newVersion): 929 assert newVersion in VERSIONS, ( 930 "This side of the connection doesn't support version %r" 931 % (newVersion,)) 932 v = VERSIONS[:] 933 v.remove(newVersion) 934 return Negotiate(versions=[newVersion]).do(self).addCallback( 935 lambda ver: self._setProtocolVersion(ver['version'])) 936 937 def command_NEGOTIATE(self, versions): 938 for version in versions: 939 if version in VERSIONS: 940 return dict(version=version) 941 raise IncompatibleVersions() 942 command_NEGOTIATE.command = Negotiate 943 944 945VERSIONS = [1] 946 947class _ParserHelper(Juice): 948 def __init__(self): 949 Juice.__init__(self, False) 950 self.boxes = [] 951 self.results = Deferred() 952 953 def getPeer(self): 954 return 'string' 955 956 def getHost(self): 957 return 'string' 958 959 disconnecting = False 960 961 def juiceBoxReceived(self, box): 962 self.boxes.append(box) 963 964 # Synchronous helpers 965 def parse(cls, fileObj): 966 p = cls() 967 p.makeConnection(p) 968 p.dataReceived(fileObj.read()) 969 return p.boxes 970 parse = classmethod(parse) 971 972 def parseString(cls, data): 973 with io.BytesIO(data) as f: 974 return cls.parse(f) 975 parseString = classmethod(parseString) 976 977parse = _ParserHelper.parse 978parseString = _ParserHelper.parseString 979 980def stringsToObjects(strings, arglist, proto): 981 objects = {} 982 myStrings = strings.copy() 983 for argname, argparser in arglist: 984 argparser.fromBox(argname, myStrings, objects, proto) 985 return objects 986 987def objectsToStrings(objects, arglist, strings, proto): 988 myObjects = {} 989 for (k, v) in objects.items(): 990 myObjects[normalizeKey(k)] = v 991 992 for argname, argparser in arglist: 993 argparser.toBox(argname, strings, myObjects, proto) 994 return strings 995 996class JuiceServerFactory(ServerFactory): 997 protocol = Juice 998 def buildProtocol(self, addr): 999 prot = self.protocol(True) 1000 prot.factory = self 1001 return prot 1002 1003class JuiceClientFactory(ClientFactory): 1004 protocol = Juice 1005 def buildProtocol(self, addr): 1006 prot = self.protocol(False) 1007 prot.factory = self 1008 return prot 1009 1010