1############################################################################### 2## 3## Copyright (C) 2011-2013 Tavendo GmbH 4## 5## Licensed under the Apache License, Version 2.0 (the "License"); 6## you may not use this file except in compliance with the License. 7## You may obtain a copy of the License at 8## 9## http://www.apache.org/licenses/LICENSE-2.0 10## 11## Unless required by applicable law or agreed to in writing, software 12## distributed under the License is distributed on an "AS IS" BASIS, 13## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14## See the License for the specific language governing permissions and 15## limitations under the License. 16## 17############################################################################### 18 19from __future__ import absolute_import 20 21import sys 22PY3 = sys.version_info >= (3,) 23 24 25__all__ = ("WampProtocol", 26 "WampFactory", 27 "WampServerProtocol", 28 "WampServerFactory", 29 "WampClientProtocol", 30 "WampClientFactory", 31 "WampCraProtocol", 32 "WampCraClientProtocol", 33 "WampCraServerProtocol", 34 "json_lib", 35 "json_loads", 36 "json_dumps",) 37 38 39import inspect, types 40import traceback 41 42if PY3: 43 from io import StringIO 44else: 45 import StringIO 46 47import hashlib, hmac, binascii, random 48 49from twisted.python import log 50from twisted.internet.defer import Deferred, \ 51 maybeDeferred 52 53from autobahn import __version__ 54 55from autobahn.websocket.protocol import WebSocketProtocol 56 57from autobahn.websocket import http 58from autobahn.twisted.websocket import WebSocketClientProtocol, \ 59 WebSocketClientFactory, \ 60 WebSocketServerFactory, \ 61 WebSocketServerProtocol 62from autobahn.wamp1.pbkdf2 import pbkdf2_bin 63from autobahn.wamp1.prefixmap import PrefixMap 64from autobahn.util import utcnow, newid, Tracker 65 66 67def exportRpc(arg = None): 68 """ 69 Decorator for RPC'ed callables. 70 """ 71 ## decorator without argument 72 if type(arg) is types.FunctionType: 73 arg._autobahn_rpc_id = arg.__name__ 74 return arg 75 ## decorator with argument 76 else: 77 def inner(f): 78 f._autobahn_rpc_id = arg 79 return f 80 return inner 81 82def exportSub(arg, prefixMatch = False): 83 """ 84 Decorator for subscription handlers. 85 """ 86 def inner(f): 87 f._autobahn_sub_id = arg 88 f._autobahn_sub_prefix_match = prefixMatch 89 return f 90 return inner 91 92def exportPub(arg, prefixMatch = False): 93 """ 94 Decorator for publication handlers. 95 """ 96 def inner(f): 97 f._autobahn_pub_id = arg 98 f._autobahn_pub_prefix_match = prefixMatch 99 return f 100 return inner 101 102 103class WampProtocol: 104 """ 105 WAMP protocol base class. Mixin for WampServerProtocol and WampClientProtocol. 106 """ 107 108 URI_WAMP_BASE = "http://api.wamp.ws/" 109 """ 110 WAMP base URI for WAMP predefined things. 111 """ 112 113 URI_WAMP_ERROR = URI_WAMP_BASE + "error#" 114 """ 115 Prefix for WAMP errors. 116 """ 117 118 URI_WAMP_PROCEDURE = URI_WAMP_BASE + "procedure#" 119 """ 120 Prefix for WAMP predefined RPC endpoints. 121 """ 122 123 URI_WAMP_TOPIC = URI_WAMP_BASE + "topic#" 124 """ 125 Prefix for WAMP predefined PubSub topics. 126 """ 127 128 URI_WAMP_ERROR_GENERIC = URI_WAMP_ERROR + "generic" 129 """ 130 WAMP error URI for generic errors. 131 """ 132 133 DESC_WAMP_ERROR_GENERIC = "generic error" 134 """ 135 Description for WAMP generic errors. 136 """ 137 138 URI_WAMP_ERROR_INTERNAL = URI_WAMP_ERROR + "internal" 139 """ 140 WAMP error URI for internal errors. 141 """ 142 143 DESC_WAMP_ERROR_INTERNAL = "internal error" 144 """ 145 Description for WAMP internal errors. 146 """ 147 148 URI_WAMP_ERROR_NO_SUCH_RPC_ENDPOINT = URI_WAMP_ERROR + "NoSuchRPCEndpoint" 149 """ 150 WAMP error URI for RPC endpoint not found. 151 """ 152 153 WAMP_PROTOCOL_VERSION = 1 154 """ 155 WAMP version this server speaks. Versions are numbered consecutively 156 (integers, no gaps). 157 """ 158 159 MESSAGE_TYPEID_WELCOME = 0 160 """ 161 Server-to-client welcome message containing session ID. 162 """ 163 164 MESSAGE_TYPEID_PREFIX = 1 165 """ 166 Client-to-server message establishing a URI prefix to be used in CURIEs. 167 """ 168 169 MESSAGE_TYPEID_CALL = 2 170 """ 171 Client-to-server message initiating an RPC. 172 """ 173 174 MESSAGE_TYPEID_CALL_RESULT = 3 175 """ 176 Server-to-client message returning the result of a successful RPC. 177 """ 178 179 MESSAGE_TYPEID_CALL_ERROR = 4 180 """ 181 Server-to-client message returning the error of a failed RPC. 182 """ 183 184 MESSAGE_TYPEID_SUBSCRIBE = 5 185 """ 186 Client-to-server message subscribing to a topic. 187 """ 188 189 MESSAGE_TYPEID_UNSUBSCRIBE = 6 190 """ 191 Client-to-server message unsubscribing from a topic. 192 """ 193 194 MESSAGE_TYPEID_PUBLISH = 7 195 """ 196 Client-to-server message publishing an event to a topic. 197 """ 198 199 MESSAGE_TYPEID_EVENT = 8 200 """ 201 Server-to-client message providing the event of a (subscribed) topic. 202 """ 203 204 def connectionMade(self): 205 self.debugWamp = self.factory.debugWamp 206 self.debugApp = self.factory.debugApp 207 self.prefixes = PrefixMap() 208 self.calls = {} 209 self.procs = {} 210 211 212 def connectionLost(self, reason): 213 pass 214 215 216 def _protocolError(self, reason): 217 if self.debugWamp: 218 log.msg("Closing Wamp session on protocol violation : %s" % reason) 219 220 ## FIXME: subprotocols are probably not supposed to close with CLOSE_STATUS_CODE_PROTOCOL_ERROR 221 ## 222 self.protocolViolation("Wamp RPC/PubSub protocol violation ('%s')" % reason) 223 224 225 def shrink(self, uri, passthrough = False): 226 """ 227 Shrink given URI to CURIE according to current prefix mapping. 228 If no appropriate prefix mapping is available, return original URI. 229 230 :param uri: URI to shrink. 231 :type uri: str 232 233 :returns str -- CURIE or original URI. 234 """ 235 return self.prefixes.shrink(uri) 236 237 238 def resolve(self, curieOrUri, passthrough = False): 239 """ 240 Resolve given CURIE/URI according to current prefix mapping or return 241 None if cannot be resolved. 242 243 :param curieOrUri: CURIE or URI. 244 :type curieOrUri: str 245 246 :returns: str -- Full URI for CURIE or None. 247 """ 248 return self.prefixes.resolve(curieOrUri) 249 250 251 def resolveOrPass(self, curieOrUri): 252 """ 253 Resolve given CURIE/URI according to current prefix mapping or return 254 string verbatim if cannot be resolved. 255 256 :param curieOrUri: CURIE or URI. 257 :type curieOrUri: str 258 259 :returns: str -- Full URI for CURIE or original string. 260 """ 261 return self.prefixes.resolveOrPass(curieOrUri) 262 263 264 def serializeMessage(self, msg): 265 """ 266 Delegate message serialization to the factory. 267 :param msg: The message to be serialized. 268 :type msg: str 269 :return: The serialized message. 270 """ 271 return self.factory._serialize(msg) 272 273 274 def registerForRpc(self, obj, baseUri = "", methods = None): 275 """ 276 Register an service object for RPC. A service object has methods 277 which are decorated using @exportRpc. 278 279 :param obj: The object to be registered (in this WebSockets session) for RPC. 280 :type obj: Object with methods decorated using @exportRpc. 281 :param baseUri: Optional base URI which is prepended to method names for export. 282 :type baseUri: String. 283 :param methods: If not None, a list of unbound class methods corresponding to obj 284 which should be registered. This can be used to register only a subset 285 of the methods decorated with @exportRpc. 286 :type methods: List of unbound class methods. 287 """ 288 for k in inspect.getmembers(obj.__class__, inspect.ismethod): 289 if k[1].__dict__.has_key("_autobahn_rpc_id"): 290 if methods is None or k[1] in methods: 291 uri = baseUri + k[1].__dict__["_autobahn_rpc_id"] 292 proc = k[1] 293 self.registerMethodForRpc(uri, obj, proc) 294 295 296 def registerMethodForRpc(self, uri, obj, proc): 297 """ 298 Register a method of an object for RPC. 299 300 :param uri: URI to register RPC method under. 301 :type uri: str 302 :param obj: The object on which to register a method for RPC. 303 :type obj: object 304 :param proc: Unbound object method to register RPC for. 305 :type proc: unbound method 306 """ 307 self.procs[uri] = (obj, proc, False) 308 if self.debugWamp: 309 log.msg("registered remote method on %s" % uri) 310 311 312 def registerProcedureForRpc(self, uri, proc): 313 """ 314 Register a (free standing) function/procedure for RPC. 315 316 :param uri: URI to register RPC function/procedure under. 317 :type uri: str 318 :param proc: Free-standing function/procedure. 319 :type proc: callable 320 """ 321 self.procs[uri] = (None, proc, False) 322 if self.debugWamp: 323 log.msg("registered remote procedure on %s" % uri) 324 325 326 def registerHandlerMethodForRpc(self, uri, obj, handler, extra = None): 327 """ 328 Register a handler on an object for RPC. 329 330 :param uri: URI to register RPC method under. 331 :type uri: str 332 :param obj: The object on which to register the RPC handler 333 :type obj: object 334 :param proc: Unbound object method to register RPC for. 335 :type proc: unbound method 336 :param extra: Optional extra data that will be given to the handler at call time. 337 :type extra: object 338 """ 339 self.procs[uri] = (obj, handler, True, extra) 340 if self.debugWamp: 341 log.msg("registered remote handler method on %s" % uri) 342 343 344 def registerHandlerProcedureForRpc(self, uri, handler, extra = None): 345 """ 346 Register a (free standing) handler for RPC. 347 348 :param uri: URI to register RPC handler under. 349 :type uri: str 350 :param proc: Free-standing handler 351 :type proc: callable 352 :param extra: Optional extra data that will be given to the handler at call time. 353 :type extra: object 354 """ 355 self.procs[uri] = (None, handler, True, extra) 356 if self.debugWamp: 357 log.msg("registered remote handler procedure on %s" % uri) 358 359 360 def procForUri(self, uri): 361 """ 362 Returns the procedure specification for `uri` or None, if it does not exist. 363 364 :param uri: URI to be checked. 365 :type uri: str 366 :returns: The procedure specification for `uri`, if it exists, 367 `None` otherwise. 368 """ 369 return self.procs[uri] if uri in self.procs else None 370 371 372 def onBeforeCall(self, callid, uri, args, isRegistered): 373 """ 374 Callback fired before executing incoming RPC. This can be used for 375 logging, statistics tracking or redirecting RPCs or argument mangling i.e. 376 377 The default implementation just returns the incoming URI/args. 378 379 :param uri: RPC endpoint URI (fully-qualified). 380 :type uri: str 381 :param args: RPC arguments array. 382 :type args: list 383 :param isRegistered: True, iff RPC endpoint URI is registered in this session. 384 :type isRegistered: bool 385 :returns pair -- Must return URI/Args pair. 386 """ 387 return uri, args 388 389 390 def onAfterCallSuccess(self, result, call): 391 """ 392 Callback fired after executing incoming RPC with success, but before 393 sending the RPC success message. 394 395 The default implementation will just return `result` to the client. 396 397 :param result: Result returned for executing the incoming RPC. 398 :type result: Anything returned by the user code for the endpoint. 399 :param call: WAMP call object for incoming RPC. 400 :type call: instance of Call 401 :returns obj -- Result send back to client. 402 """ 403 return result 404 405 406 def onAfterCallError(self, error, call): 407 """ 408 Callback fired after executing incoming RPC with failure, but before 409 sending the RPC error message. 410 411 The default implementation will just return `error` to the client. 412 413 :param error: Error that occurred during incomnig RPC call execution. 414 :type error: Instance of twisted.python.failure.Failure 415 :param call: WAMP call object for incoming RPC. 416 :type call: instance of Call 417 :returns twisted.python.failure.Failure -- Error sent back to client. 418 """ 419 return error 420 421 422 def onAfterSendCallSuccess(self, msg, call): 423 """ 424 Callback fired after sending RPC success message. 425 426 :param msg: Serialized WAMP message. 427 :type msg: str 428 :param call: WAMP call object for incoming RPC. 429 :type call: instance of Call 430 """ 431 pass 432 433 434 def onAfterSendCallError(self, msg, call): 435 """ 436 Callback fired after sending RPC error message. 437 438 :param msg: Serialized WAMP message. 439 :type msg: str 440 :param call: WAMP call object for incoming RPC. 441 :type call: instance of Call 442 """ 443 pass 444 445 446 def call(self, *args): 447 """ 448 Perform a remote-procedure call (RPC). The first argument is the procedure 449 URI (mandatory). Subsequent positional arguments can be provided (must be 450 JSON serializable). The return value is a Twisted Deferred. 451 """ 452 453 if len(args) < 1: 454 raise Exception("missing procedure URI") 455 456 if type(args[0]) not in [unicode, str]: 457 raise Exception("invalid type for procedure URI") 458 459 procuri = args[0] 460 callid = None 461 while True: 462 callid = newid() 463 if not self.calls.has_key(callid): 464 break 465 d = Deferred() 466 self.calls[callid] = d 467 msg = [WampProtocol.MESSAGE_TYPEID_CALL, callid, procuri] 468 msg.extend(args[1:]) 469 470 try: 471 o = self.factory._serialize(msg) 472 except: 473 raise Exception("call argument(s) not JSON serializable") 474 475 self.sendMessage(o) 476 return d 477 478 479 480## use Ultrajson (https://github.com/esnme/ultrajson) if available 481## 482try: 483 import ujson 484 json_lib = ujson 485 json_loads = ujson.loads 486 json_dumps = lambda x: ujson.dumps(x, ensure_ascii = False) 487except: 488 import json 489 json_lib = json 490 json_loads = json.loads 491 json_dumps = json.dumps 492 493 494 495class WampFactory: 496 """ 497 WAMP factory base class. Mixin for WampServerFactory and WampClientFactory. 498 """ 499 500 def __init__(self): 501 if self.debugWamp: 502 log.msg("Using JSON processor '%s'" % json_lib.__name__) 503 504 505 def _serialize(self, obj): 506 """ 507 Default object serializer. 508 """ 509 return json_dumps(obj) 510 511 512 def _unserialize(self, bytes): 513 """ 514 Default object deserializer. 515 """ 516 return json_loads(bytes) 517 518 519 520class WampServerProtocol(WebSocketServerProtocol, WampProtocol): 521 """ 522 Server factory for Wamp RPC/PubSub. 523 """ 524 525 SUBSCRIBE = 1 526 PUBLISH = 2 527 528 def onSessionOpen(self): 529 """ 530 Callback fired when WAMP session was fully established. 531 """ 532 pass 533 534 535 def onOpen(self): 536 """ 537 Default implementation for WAMP connection opened sends 538 Welcome message containing session ID. 539 """ 540 self.session_id = newid() 541 542 ## include traceback as error detail for RPC errors with 543 ## no error URI - that is errors returned with URI_WAMP_ERROR_GENERIC 544 self.includeTraceback = False 545 546 msg = [WampProtocol.MESSAGE_TYPEID_WELCOME, 547 self.session_id, 548 WampProtocol.WAMP_PROTOCOL_VERSION, 549 "Autobahn/%s" % __version__] 550 o = self.factory._serialize(msg) 551 self.sendMessage(o) 552 553 self.factory._addSession(self, self.session_id) 554 self.onSessionOpen() 555 556 557 def onConnect(self, connectionRequest): 558 """ 559 Default implementation for WAMP connection acceptance: 560 check if client announced WAMP subprotocol, and only accept connection 561 if client did so. 562 """ 563 for p in connectionRequest.protocols: 564 if p in self.factory.protocols: 565 return (p, {}) # return (protocol, headers) 566 raise http.HttpException(http.BAD_REQUEST[0], "this server only speaks WAMP") 567 568 569 def connectionMade(self): 570 WebSocketServerProtocol.connectionMade(self) 571 WampProtocol.connectionMade(self) 572 573 ## RPCs registered in this session (a URI map of (object, procedure) 574 ## pairs for object methods or (None, procedure) for free standing procedures) 575 self.procs = {} 576 577 ## Publication handlers registered in this session (a URI map of (object, pubHandler) pairs 578 ## pairs for object methods (handlers) or (None, None) for topic without handler) 579 self.pubHandlers = {} 580 581 ## Subscription handlers registered in this session (a URI map of (object, subHandler) pairs 582 ## pairs for object methods (handlers) or (None, None) for topic without handler) 583 self.subHandlers = {} 584 585 self.handlerMapping = { 586 self.MESSAGE_TYPEID_CALL: CallHandler(self, self.prefixes), 587 self.MESSAGE_TYPEID_CALL_RESULT: CallResultHandler(self, self.prefixes), 588 self.MESSAGE_TYPEID_CALL_ERROR: CallErrorHandler(self, self.prefixes)} 589 590 591 def connectionLost(self, reason): 592 self.factory._unsubscribeClient(self) 593 self.factory._removeSession(self) 594 595 WampProtocol.connectionLost(self, reason) 596 WebSocketServerProtocol.connectionLost(self, reason) 597 598 599 def sendMessage(self, 600 payload, 601 binary = False, 602 payload_frag_size = None, 603 sync = False, 604 doNotCompress = False): 605 if self.debugWamp: 606 log.msg("TX WAMP: %s" % str(payload)) 607 WebSocketServerProtocol.sendMessage(self, 608 payload, 609 binary, 610 payload_frag_size, 611 sync, 612 doNotCompress) 613 614 615 def _getPubHandler(self, topicUri): 616 ## Longest matching prefix based resolution of (full) topic URI to 617 ## publication handler. 618 ## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch) 619 ## 620 for i in xrange(len(topicUri), -1, -1): 621 tt = topicUri[:i] 622 if self.pubHandlers.has_key(tt): 623 h = self.pubHandlers[tt] 624 return (tt, topicUri[i:], h[0], h[1], h[2]) 625 return None 626 627 628 def _getSubHandler(self, topicUri): 629 ## Longest matching prefix based resolution of (full) topic URI to 630 ## subscription handler. 631 ## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch) 632 ## 633 for i in xrange(len(topicUri), -1, -1): 634 tt = topicUri[:i] 635 if self.subHandlers.has_key(tt): 636 h = self.subHandlers[tt] 637 return (tt, topicUri[i:], h[0], h[1], h[2]) 638 return None 639 640 641 def registerForPubSub(self, topicUri, prefixMatch = False, pubsub = PUBLISH | SUBSCRIBE): 642 """ 643 Register a topic URI as publish/subscribe channel in this session. 644 645 :param topicUri: Topic URI to be established as publish/subscribe channel. 646 :type topicUri: str 647 :param prefixMatch: Allow to match this topic URI by prefix. 648 :type prefixMatch: bool 649 :param pubsub: Allow publication and/or subscription. 650 :type pubsub: WampServerProtocol.PUB, WampServerProtocol.SUB, WampServerProtocol.PUB | WampServerProtocol.SUB 651 """ 652 if pubsub & WampServerProtocol.PUBLISH: 653 self.pubHandlers[topicUri] = (None, None, prefixMatch) 654 if self.debugWamp: 655 log.msg("registered topic %s for publication (match by prefix = %s)" % (topicUri, prefixMatch)) 656 if pubsub & WampServerProtocol.SUBSCRIBE: 657 self.subHandlers[topicUri] = (None, None, prefixMatch) 658 if self.debugWamp: 659 log.msg("registered topic %s for subscription (match by prefix = %s)" % (topicUri, prefixMatch)) 660 661 662 def registerHandlerForPubSub(self, obj, baseUri = ""): 663 """ 664 Register a handler object for PubSub. A handler object has methods 665 which are decorated using @exportPub and @exportSub. 666 667 :param obj: The object to be registered (in this WebSockets session) for PubSub. 668 :type obj: Object with methods decorated using @exportPub and @exportSub. 669 :param baseUri: Optional base URI which is prepended to topic names for export. 670 :type baseUri: String. 671 """ 672 for k in inspect.getmembers(obj.__class__, inspect.ismethod): 673 if k[1].__dict__.has_key("_autobahn_pub_id"): 674 uri = baseUri + k[1].__dict__["_autobahn_pub_id"] 675 prefixMatch = k[1].__dict__["_autobahn_pub_prefix_match"] 676 proc = k[1] 677 self.registerHandlerForPub(uri, obj, proc, prefixMatch) 678 elif k[1].__dict__.has_key("_autobahn_sub_id"): 679 uri = baseUri + k[1].__dict__["_autobahn_sub_id"] 680 prefixMatch = k[1].__dict__["_autobahn_sub_prefix_match"] 681 proc = k[1] 682 self.registerHandlerForSub(uri, obj, proc, prefixMatch) 683 684 685 def registerHandlerForSub(self, uri, obj, proc, prefixMatch = False): 686 """ 687 Register a method of an object as subscription handler. 688 689 :param uri: Topic URI to register subscription handler for. 690 :type uri: str 691 :param obj: The object on which to register a method as subscription handler. 692 :type obj: object 693 :param proc: Unbound object method to register as subscription handler. 694 :type proc: unbound method 695 :param prefixMatch: Allow to match this topic URI by prefix. 696 :type prefixMatch: bool 697 """ 698 self.subHandlers[uri] = (obj, proc, prefixMatch) 699 if not self.pubHandlers.has_key(uri): 700 self.pubHandlers[uri] = (None, None, False) 701 if self.debugWamp: 702 log.msg("registered subscription handler for topic %s" % uri) 703 704 705 def registerHandlerForPub(self, uri, obj, proc, prefixMatch = False): 706 """ 707 Register a method of an object as publication handler. 708 709 :param uri: Topic URI to register publication handler for. 710 :type uri: str 711 :param obj: The object on which to register a method as publication handler. 712 :type obj: object 713 :param proc: Unbound object method to register as publication handler. 714 :type proc: unbound method 715 :param prefixMatch: Allow to match this topic URI by prefix. 716 :type prefixMatch: bool 717 """ 718 self.pubHandlers[uri] = (obj, proc, prefixMatch) 719 if not self.subHandlers.has_key(uri): 720 self.subHandlers[uri] = (None, None, False) 721 if self.debugWamp: 722 log.msg("registered publication handler for topic %s" % uri) 723 724 725 # noinspection PyDefaultArgument 726 def dispatch(self, topicUri, event, exclude = [], eligible = None): 727 """ 728 Dispatch an event for a topic to all clients subscribed to 729 and authorized for that topic. 730 731 Optionally, exclude list of clients and/or only consider clients 732 from explicit eligibles. In other words, the event is delivered 733 to the set 734 735 (subscribers - excluded) & eligible 736 737 :param topicUri: URI of topic to publish event to. 738 :type topicUri: str 739 :param event: Event to dispatch. 740 :type event: obj 741 :param exclude: Optional list of clients (WampServerProtocol instances) to exclude. 742 :type exclude: list of obj 743 :param eligible: Optional list of clients (WampServerProtocol instances) eligible at all (or None for all). 744 :type eligible: list of obj 745 746 :returns twisted.internet.defer.Deferred -- Will be fired when event was 747 dispatched to all subscribers. The return value provided to the deferred 748 is a pair (delivered, requested), where delivered = number of actual 749 receivers, and requested = number of (subscribers - excluded) & eligible. 750 """ 751 return self.factory.dispatch(topicUri, event, exclude, eligible) 752 753 754 def onMessage(self, msg, binary): 755 """ 756 Handle WAMP messages received from WAMP client. 757 """ 758 759 if self.debugWamp: 760 log.msg("RX WAMP: %s" % str(msg)) 761 762 if not binary: 763 try: 764 obj = self.factory._unserialize(msg) 765 if type(obj) == list: 766 767 msgtype = obj[0] 768 769 ### XXX Replace check by try...except when all handlers 770 ### XXX are in place. Exception handling should create 771 ### XXX a protocolError message about unsupported 772 ### XXX message type 773 if msgtype in [WampProtocol.MESSAGE_TYPEID_CALL, 774 WampProtocol.MESSAGE_TYPEID_CALL_RESULT, 775 WampProtocol.MESSAGE_TYPEID_CALL_ERROR]: 776 self.handlerMapping[msgtype].handleMessage(obj) 777 778 ### XXX Move remaining code to appropriate handlers 779 780 ## Subscribe Message 781 ## 782 elif msgtype == WampProtocol.MESSAGE_TYPEID_SUBSCRIBE: 783 topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove 784 h = self._getSubHandler(topicUri) 785 if h: 786 ## either exact match or prefix match allowed 787 if h[1] == "" or h[4]: 788 789 ## direct topic 790 if h[2] is None and h[3] is None: 791 self.factory._subscribeClient(self, topicUri) 792 793 ## topic handled by subscription handler 794 else: 795 ## handler is object method 796 if h[2]: 797 a = maybeDeferred(h[3], h[2], str(h[0]), str(h[1])) 798 799 ## handler is free standing procedure 800 else: 801 a = maybeDeferred(h[3], str(h[0]), str(h[1])) 802 803 def fail(failure): 804 if self.debugWamp: 805 log.msg("exception during custom subscription handler: %s" % failure) 806 807 def done(result): 808 ## only subscribe client if handler did return True 809 if result: 810 self.factory._subscribeClient(self, topicUri) 811 812 a.addCallback(done).addErrback(fail) 813 else: 814 if self.debugWamp: 815 log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri) 816 else: 817 if self.debugWamp: 818 log.msg("no topic / subscription handler registered for %s" % topicUri) 819 820 ## Unsubscribe Message 821 ## 822 elif msgtype == WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE: 823 topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove 824 self.factory._unsubscribeClient(self, topicUri) 825 826 ## Publish Message 827 ## 828 elif msgtype == WampProtocol.MESSAGE_TYPEID_PUBLISH: 829 topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove 830 h = self._getPubHandler(topicUri) 831 if h: 832 ## either exact match or prefix match allowed 833 if h[1] == "" or h[4]: 834 835 ## Event 836 ## 837 event = obj[2] 838 839 ## Exclude Sessions List 840 ## 841 exclude = [self] # exclude publisher by default 842 if len(obj) >= 4: 843 if type(obj[3]) == bool: 844 if not obj[3]: 845 exclude = [] 846 elif type(obj[3]) == list: 847 ## map session IDs to protos 848 exclude = self.factory.sessionIdsToProtos(obj[3]) 849 else: 850 ## FIXME: invalid type 851 pass 852 853 ## Eligible Sessions List 854 ## 855 eligible = None # all sessions are eligible by default 856 if len(obj) >= 5: 857 if type(obj[4]) == list: 858 ## map session IDs to protos 859 eligible = self.factory.sessionIdsToProtos(obj[4]) 860 else: 861 ## FIXME: invalid type 862 pass 863 864 ## direct topic 865 if h[2] is None and h[3] is None: 866 self.factory.dispatch(topicUri, event, exclude, eligible) 867 868 ## topic handled by publication handler 869 else: 870 ## handler is object method 871 if h[2]: 872 e = maybeDeferred(h[3], h[2], str(h[0]), str(h[1]), event) 873 874 ## handler is free standing procedure 875 else: 876 e = maybeDeferred(h[3], str(h[0]), str(h[1]), event) 877 878 def fail(failure): 879 if self.debugWamp: 880 log.msg("exception during custom publication handler: %s" % failure) 881 882 def done(result): 883 ## only dispatch event if handler did return event 884 if result: 885 self.factory.dispatch(topicUri, result, exclude, eligible) 886 887 e.addCallback(done).addErrback(fail) 888 else: 889 if self.debugWamp: 890 log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri) 891 else: 892 if self.debugWamp: 893 log.msg("no topic / publication handler registered for %s" % topicUri) 894 895 ## Define prefix to be used in CURIEs 896 ## 897 elif msgtype == WampProtocol.MESSAGE_TYPEID_PREFIX: 898 prefix = obj[1] 899 uri = obj[2] 900 self.prefixes.set(prefix, uri) ### PFX - remove whole block (this msg type won't survive) 901 902 else: 903 log.msg("unknown message type") 904 else: 905 log.msg("msg not a list") 906 except Exception: 907 traceback.print_exc() 908 else: 909 log.msg("binary message") 910 911 912 913class WampServerFactory(WebSocketServerFactory, WampFactory): 914 """ 915 Server factory for Wamp RPC/PubSub. 916 """ 917 918 protocol = WampServerProtocol 919 """ 920 Twisted protocol used by default for WAMP servers. 921 """ 922 923 def __init__(self, 924 url, 925 debug = False, 926 debugCodePaths = False, 927 debugWamp = False, 928 debugApp = False, 929 externalPort = None, 930 reactor = None): 931 self.debugWamp = debugWamp 932 self.debugApp = debugApp 933 WebSocketServerFactory.__init__(self, 934 url, 935 protocols = ["wamp"], 936 debug = debug, 937 debugCodePaths = debugCodePaths, 938 externalPort = externalPort, 939 reactor = reactor) 940 WampFactory.__init__(self) 941 942 943 def onClientSubscribed(self, proto, topicUri): 944 """ 945 Callback fired when peer was (successfully) subscribed on some topic. 946 947 :param proto: Peer protocol instance subscribed. 948 :type proto: Instance of WampServerProtocol. 949 :param topicUri: Fully qualified, resolved URI of topic subscribed. 950 :type topicUri: str 951 """ 952 pass 953 954 955 def _subscribeClient(self, proto, topicUri): 956 """ 957 Called from proto to subscribe client for topic. 958 """ 959 if not self.subscriptions.has_key(topicUri): 960 self.subscriptions[topicUri] = set() 961 if self.debugWamp: 962 log.msg("subscriptions map created for topic %s" % topicUri) 963 if not proto in self.subscriptions[topicUri]: 964 self.subscriptions[topicUri].add(proto) 965 if self.debugWamp: 966 log.msg("subscribed peer %s on topic %s" % (proto.peer, topicUri)) 967 self.onClientSubscribed(proto, topicUri) 968 else: 969 if self.debugWamp: 970 log.msg("peer %s already subscribed on topic %s" % (proto.peer, topicUri)) 971 972 973 def onClientUnsubscribed(self, proto, topicUri): 974 """ 975 Callback fired when peer was (successfully) unsubscribed from some topic. 976 977 :param proto: Peer protocol instance unsubscribed. 978 :type proto: Instance of WampServerProtocol. 979 :param topicUri: Fully qualified, resolved URI of topic unsubscribed. 980 :type topicUri: str 981 """ 982 pass 983 984 985 def _unsubscribeClient(self, proto, topicUri = None): 986 """ 987 Called from proto to unsubscribe client from topic. 988 """ 989 if topicUri: 990 if self.subscriptions.has_key(topicUri) and proto in self.subscriptions[topicUri]: 991 self.subscriptions[topicUri].discard(proto) 992 if self.debugWamp: 993 log.msg("unsubscribed peer %s from topic %s" % (proto.peer, topicUri)) 994 if len(self.subscriptions[topicUri]) == 0: 995 del self.subscriptions[topicUri] 996 if self.debugWamp: 997 log.msg("topic %s removed from subscriptions map - no one subscribed anymore" % topicUri) 998 self.onClientUnsubscribed(proto, topicUri) 999 else: 1000 if self.debugWamp: 1001 log.msg("peer %s not subscribed on topic %s" % (proto.peer, topicUri)) 1002 else: 1003 for topicUri, subscribers in self.subscriptions.items(): 1004 if proto in subscribers: 1005 subscribers.discard(proto) 1006 if self.debugWamp: 1007 log.msg("unsubscribed peer %s from topic %s" % (proto.peer, topicUri)) 1008 if len(subscribers) == 0: 1009 del self.subscriptions[topicUri] 1010 if self.debugWamp: 1011 log.msg("topic %s removed from subscriptions map - no one subscribed anymore" % topicUri) 1012 self.onClientUnsubscribed(proto, topicUri) 1013 if self.debugWamp: 1014 log.msg("unsubscribed peer %s from all topics" % (proto.peer)) 1015 1016 1017 # noinspection PyDefaultArgument 1018 def dispatch(self, topicUri, event, exclude = [], eligible = None): 1019 """ 1020 Dispatch an event to all peers subscribed to the event topic. 1021 1022 :param topicUri: Topic to publish event to. 1023 :type topicUri: str 1024 :param event: Event to publish (must be JSON serializable). 1025 :type event: obj 1026 :param exclude: List of WampServerProtocol instances to exclude from receivers. 1027 :type exclude: List of obj 1028 :param eligible: List of WampServerProtocol instances eligible as receivers (or None for all). 1029 :type eligible: List of obj 1030 1031 :returns twisted.internet.defer.Deferred -- Will be fired when event was 1032 dispatched to all subscribers. The return value provided to the deferred 1033 is a pair (delivered, requested), where delivered = number of actual 1034 receivers, and requested = number of (subscribers - excluded) & eligible. 1035 """ 1036 if self.debugWamp: 1037 log.msg("publish event %s for topicUri %s" % (str(event), topicUri)) 1038 1039 d = Deferred() 1040 1041 if self.subscriptions.has_key(topicUri) and len(self.subscriptions[topicUri]) > 0: 1042 1043 ## FIXME: this might break ordering of event delivery from a 1044 ## receiver perspective. We might need to have send queues 1045 ## per receiver OR do recvs = deque(sorted(..)) 1046 1047 ## However, see http://twistedmatrix.com/trac/ticket/1396 1048 1049 if eligible is not None: 1050 subscrbs = set(eligible) & self.subscriptions[topicUri] 1051 else: 1052 subscrbs = self.subscriptions[topicUri] 1053 1054 if len(exclude) > 0: 1055 recvs = subscrbs - set(exclude) 1056 else: 1057 recvs = subscrbs 1058 1059 l = len(recvs) 1060 if l > 0: 1061 1062 ## ok, at least 1 subscriber not excluded and eligible 1063 ## => prepare message for mass sending 1064 ## 1065 o = [WampProtocol.MESSAGE_TYPEID_EVENT, topicUri, event] 1066 try: 1067 msg = self._serialize(o) 1068 if self.debugWamp: 1069 log.msg("serialized event msg: " + str(msg)) 1070 except Exception as e: 1071 raise Exception("invalid type for event - serialization failed [%s]" % e) 1072 1073 preparedMsg = self.prepareMessage(msg) 1074 1075 ## chunked sending of prepared message 1076 ## 1077 self._sendEvents(preparedMsg, recvs.copy(), 0, l, d) 1078 1079 else: 1080 ## receivers list empty after considering exlude and eligible sessions 1081 ## 1082 d.callback((0, 0)) 1083 else: 1084 ## no one subscribed on topic 1085 ## 1086 d.callback((0, 0)) 1087 1088 return d 1089 1090 1091 def _sendEvents(self, preparedMsg, recvs, delivered, requested, d): 1092 """ 1093 Delivers events to receivers in chunks and reenters the reactor 1094 in-between, so that other stuff can run. 1095 """ 1096 ## deliver a batch of events 1097 done = False 1098 for i in xrange(0, 256): 1099 try: 1100 proto = recvs.pop() 1101 if proto.state == WebSocketProtocol.STATE_OPEN: 1102 try: 1103 proto.sendPreparedMessage(preparedMsg) 1104 except: 1105 pass 1106 else: 1107 if self.debugWamp: 1108 log.msg("delivered event to peer %s" % proto.peer) 1109 delivered += 1 1110 except KeyError: 1111 # all receivers done 1112 done = True 1113 break 1114 1115 if not done: 1116 ## if there are receivers left, redo 1117 self.reactor.callLater(0, self._sendEvents, preparedMsg, recvs, delivered, requested, d) 1118 else: 1119 ## else fire final result 1120 d.callback((delivered, requested)) 1121 1122 1123 def _addSession(self, proto, session_id): 1124 """ 1125 Add proto for session ID. 1126 """ 1127 if not self.protoToSessions.has_key(proto): 1128 self.protoToSessions[proto] = session_id 1129 else: 1130 raise Exception("logic error - dublicate _addSession for protoToSessions") 1131 if not self.sessionsToProto.has_key(session_id): 1132 self.sessionsToProto[session_id] = proto 1133 else: 1134 raise Exception("logic error - dublicate _addSession for sessionsToProto") 1135 1136 1137 def _removeSession(self, proto): 1138 """ 1139 Remove session by proto. 1140 """ 1141 if self.protoToSessions.has_key(proto): 1142 session_id = self.protoToSessions[proto] 1143 del self.protoToSessions[proto] 1144 if self.sessionsToProto.has_key(session_id): 1145 del self.sessionsToProto[session_id] 1146 1147 1148 def sessionIdToProto(self, sessionId): 1149 """ 1150 Map WAMP session ID to connected protocol instance (object of type WampServerProtocol). 1151 1152 :param sessionId: WAMP session ID to be mapped. 1153 :type sessionId: str 1154 1155 :returns obj -- WampServerProtocol instance or None. 1156 """ 1157 return self.sessionsToProto.get(sessionId, None) 1158 1159 1160 def sessionIdsToProtos(self, sessionIds): 1161 """ 1162 Map WAMP session IDs to connected protocol instances (objects of type WampServerProtocol). 1163 1164 :param sessionIds: List of session IDs to be mapped. 1165 :type sessionIds: list of str 1166 1167 :returns list -- List of WampServerProtocol instances corresponding to the WAMP session IDs. 1168 """ 1169 protos = [] 1170 for s in sessionIds: 1171 if self.sessionsToProto.has_key(s): 1172 protos.append(self.sessionsToProto[s]) 1173 return protos 1174 1175 1176 def protoToSessionId(self, proto): 1177 """ 1178 Map connected protocol instance (object of type WampServerProtocol) to WAMP session ID. 1179 1180 :param proto: Instance of WampServerProtocol to be mapped. 1181 :type proto: obj of WampServerProtocol 1182 1183 :returns str -- WAMP session ID or None. 1184 """ 1185 return self.protoToSessions.get(proto, None) 1186 1187 1188 def protosToSessionIds(self, protos): 1189 """ 1190 Map connected protocol instances (objects of type WampServerProtocol) to WAMP session IDs. 1191 1192 :param protos: List of instances of WampServerProtocol to be mapped. 1193 :type protos: list of WampServerProtocol 1194 1195 :returns list -- List of WAMP session IDs corresponding to the protos. 1196 """ 1197 sessionIds = [] 1198 for p in protos: 1199 if self.protoToSessions.has_key(p): 1200 sessionIds.append(self.protoToSessions[p]) 1201 return sessionIds 1202 1203 1204 def startFactory(self): 1205 """ 1206 Called by Twisted when the factory starts up. When overriding, make 1207 sure to call the base method. 1208 """ 1209 if self.debugWamp: 1210 log.msg("WampServerFactory starting") 1211 self.subscriptions = {} 1212 self.protoToSessions = {} 1213 self.sessionsToProto = {} 1214 1215 1216 def stopFactory(self): 1217 """ 1218 Called by Twisted when the factory shuts down. When overriding, make 1219 sure to call the base method. 1220 """ 1221 if self.debugWamp: 1222 log.msg("WampServerFactory stopped") 1223 1224 1225 1226class WampClientProtocol(WebSocketClientProtocol, WampProtocol): 1227 """ 1228 Twisted client protocol for WAMP. 1229 """ 1230 1231 def onSessionOpen(self): 1232 """ 1233 Callback fired when WAMP session was fully established. Override 1234 in derived class. 1235 """ 1236 pass 1237 1238 1239 def onOpen(self): 1240 ## do nothing here .. onSessionOpen is only fired when welcome 1241 ## message was received (and thus session ID set) 1242 pass 1243 1244 1245 def onConnect(self, connectionResponse): 1246 if connectionResponse.protocol not in self.factory.protocols: 1247 raise Exception("server does not speak WAMP") 1248 1249 1250 def connectionMade(self): 1251 WebSocketClientProtocol.connectionMade(self) 1252 WampProtocol.connectionMade(self) 1253 1254 self.subscriptions = {} 1255 1256 self.handlerMapping = { 1257 self.MESSAGE_TYPEID_CALL: CallHandler(self, self.prefixes), 1258 self.MESSAGE_TYPEID_CALL_RESULT: CallResultHandler(self, self.prefixes), 1259 self.MESSAGE_TYPEID_CALL_ERROR: CallErrorHandler(self, self.prefixes)} 1260 1261 1262 def connectionLost(self, reason): 1263 WampProtocol.connectionLost(self, reason) 1264 WebSocketClientProtocol.connectionLost(self, reason) 1265 1266 1267 def sendMessage(self, payload): 1268 if self.debugWamp: 1269 log.msg("TX WAMP: %s" % str(payload)) 1270 WebSocketClientProtocol.sendMessage(self, payload) 1271 1272 1273 def onMessage(self, msg, binary): 1274 """Internal method to handle WAMP messages received from WAMP server.""" 1275 1276 ## WAMP is text message only 1277 ## 1278 if binary: 1279 self._protocolError("binary WebSocket message received") 1280 return 1281 1282 if self.debugWamp: 1283 log.msg("RX WAMP: %s" % str(msg)) 1284 1285 ## WAMP is proper JSON payload 1286 ## 1287 try: 1288 obj = self.factory._unserialize(msg) 1289 except Exception as e: 1290 self._protocolError("WAMP message payload could not be unserialized [%s]" % e) 1291 return 1292 1293 ## Every WAMP message is a list 1294 ## 1295 if type(obj) != list: 1296 self._protocolError("WAMP message payload not a list") 1297 return 1298 1299 ## Every WAMP message starts with an integer for message type 1300 ## 1301 if len(obj) < 1: 1302 self._protocolError("WAMP message without message type") 1303 return 1304 if type(obj[0]) != int: 1305 self._protocolError("WAMP message type not an integer") 1306 return 1307 1308 ## WAMP message type 1309 ## 1310 msgtype = obj[0] 1311 1312 ## Valid WAMP message types received by WAMP clients 1313 ## 1314 if msgtype not in [WampProtocol.MESSAGE_TYPEID_WELCOME, 1315 WampProtocol.MESSAGE_TYPEID_CALL, 1316 WampProtocol.MESSAGE_TYPEID_CALL_RESULT, 1317 WampProtocol.MESSAGE_TYPEID_CALL_ERROR, 1318 WampProtocol.MESSAGE_TYPEID_EVENT]: 1319 self._protocolError("invalid WAMP message type %d" % msgtype) 1320 return 1321 1322 if msgtype in [WampProtocol.MESSAGE_TYPEID_CALL, 1323 WampProtocol.MESSAGE_TYPEID_CALL_RESULT, 1324 WampProtocol.MESSAGE_TYPEID_CALL_ERROR]: 1325 self.handlerMapping[msgtype].handleMessage(obj) 1326 1327 ## WAMP EVENT 1328 ## 1329 elif msgtype == WampProtocol.MESSAGE_TYPEID_EVENT: 1330 ## Topic 1331 ## 1332 if len(obj) != 3: 1333 self._protocolError("WAMP EVENT message invalid length %d" % len(obj)) 1334 return 1335 if type(obj[1]) not in [unicode, str]: 1336 self._protocolError("invalid type for <topic> in WAMP EVENT message") 1337 return 1338 unresolvedTopicUri = str(obj[1]) 1339 topicUri = self.prefixes.resolveOrPass(unresolvedTopicUri) ### PFX - remove 1340 1341 ## Fire PubSub Handler 1342 ## 1343 if self.subscriptions.has_key(topicUri): 1344 event = obj[2] 1345 # noinspection PyCallingNonCallable 1346 self.subscriptions[topicUri](topicUri, event) 1347 else: 1348 ## event received for non-subscribed topic (could be because we 1349 ## just unsubscribed, and server already sent out event for 1350 ## previous subscription) 1351 pass 1352 1353 ## WAMP WELCOME 1354 ## 1355 elif msgtype == WampProtocol.MESSAGE_TYPEID_WELCOME: 1356 ## Session ID 1357 ## 1358 if len(obj) < 2: 1359 self._protocolError("WAMP WELCOME message invalid length %d" % len(obj)) 1360 return 1361 if type(obj[1]) not in [unicode, str]: 1362 self._protocolError("invalid type for <sessionid> in WAMP WELCOME message") 1363 return 1364 self.session_id = str(obj[1]) 1365 1366 ## WAMP Protocol Version 1367 ## 1368 if len(obj) > 2: 1369 if type(obj[2]) not in [int]: 1370 self._protocolError("invalid type for <version> in WAMP WELCOME message") 1371 return 1372 else: 1373 self.session_protocol_version = obj[2] 1374 else: 1375 self.session_protocol_version = None 1376 1377 ## Server Ident 1378 ## 1379 if len(obj) > 3: 1380 if type(obj[3]) not in [unicode, str]: 1381 self._protocolError("invalid type for <server> in WAMP WELCOME message") 1382 return 1383 else: 1384 self.session_server = obj[3] 1385 else: 1386 self.session_server = None 1387 1388 self.onSessionOpen() 1389 1390 else: 1391 raise Exception("logic error") 1392 1393 1394 def prefix(self, prefix, uri): 1395 """ 1396 Establishes a prefix to be used in `CURIEs <http://en.wikipedia.org/wiki/CURIE>`_ 1397 instead of URIs having that prefix for both client-to-server and 1398 server-to-client messages. 1399 1400 :param prefix: Prefix to be used in CURIEs. 1401 :type prefix: str 1402 :param uri: URI that this prefix will resolve to. 1403 :type uri: str 1404 """ 1405 1406 if type(prefix) != str: 1407 raise Exception("invalid type for prefix") 1408 1409 if type(uri) not in [unicode, str]: 1410 raise Exception("invalid type for URI") 1411 1412 if self.prefixes.get(prefix): ### PFX - keep 1413 raise Exception("prefix already defined") 1414 1415 self.prefixes.set(prefix, uri) ### PFX - keep 1416 1417 msg = [WampProtocol.MESSAGE_TYPEID_PREFIX, prefix, uri] 1418 1419 self.sendMessage(self.factory._serialize(msg)) 1420 1421 1422 def publish(self, topicUri, event, excludeMe = None, exclude = None, eligible = None): 1423 """ 1424 Publish an event under a topic URI. The latter may be abbreviated using a 1425 CURIE which has been previously defined using prefix(). The event must 1426 be JSON serializable. 1427 1428 :param topicUri: The topic URI or CURIE. 1429 :type topicUri: str 1430 :param event: Event to be published (must be JSON serializable) or None. 1431 :type event: value 1432 :param excludeMe: When True, don't deliver the published event to myself (when I'm subscribed). 1433 :type excludeMe: bool 1434 :param exclude: Optional list of session IDs to exclude from receivers. 1435 :type exclude: list of str 1436 :param eligible: Optional list of session IDs to that are eligible as receivers. 1437 :type eligible: list of str 1438 """ 1439 1440 if type(topicUri) not in [unicode, str]: 1441 raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri)) 1442 1443 if excludeMe is not None: 1444 if type(excludeMe) != bool: 1445 raise Exception("invalid type for parameter 'excludeMe' - must be bool (was %s)" % type(excludeMe)) 1446 1447 if exclude is not None: 1448 if type(exclude) != list: 1449 raise Exception("invalid type for parameter 'exclude' - must be list (was %s)" % type(exclude)) 1450 1451 if eligible is not None: 1452 if type(eligible) != list: 1453 raise Exception("invalid type for parameter 'eligible' - must be list (was %s)" % type(eligible)) 1454 1455 if exclude is not None or eligible is not None: 1456 if exclude is None: 1457 if excludeMe is not None: 1458 if excludeMe: 1459 exclude = [self.session_id] 1460 else: 1461 exclude = [] 1462 else: 1463 exclude = [self.session_id] 1464 if eligible is not None: 1465 msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, exclude, eligible] 1466 else: 1467 msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, exclude] 1468 else: 1469 if excludeMe: 1470 msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event] 1471 else: 1472 msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, excludeMe] 1473 1474 try: 1475 o = self.factory._serialize(msg) 1476 except: 1477 raise Exception("invalid type for parameter 'event' - not JSON serializable") 1478 1479 self.sendMessage(o) 1480 1481 1482 def subscribe(self, topicUri, handler): 1483 """ 1484 Subscribe to topic. When already subscribed, will overwrite the handler. 1485 1486 :param topicUri: URI or CURIE of topic to subscribe to. 1487 :type topicUri: str 1488 :param handler: Event handler to be invoked upon receiving events for topic. 1489 :type handler: Python callable, will be called as in <callable>(eventUri, event). 1490 """ 1491 if type(topicUri) not in [unicode, str]: 1492 raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri)) 1493 1494 if not hasattr(handler, '__call__'): 1495 raise Exception("invalid type for parameter 'handler' - must be a callable (was %s)" % type(handler)) 1496 1497 turi = self.prefixes.resolveOrPass(topicUri) ### PFX - keep 1498 if not self.subscriptions.has_key(turi): 1499 msg = [WampProtocol.MESSAGE_TYPEID_SUBSCRIBE, topicUri] 1500 o = self.factory._serialize(msg) 1501 self.sendMessage(o) 1502 self.subscriptions[turi] = handler 1503 1504 1505 def unsubscribe(self, topicUri): 1506 """ 1507 Unsubscribe from topic. Will do nothing when currently not subscribed to the topic. 1508 1509 :param topicUri: URI or CURIE of topic to unsubscribe from. 1510 :type topicUri: str 1511 """ 1512 if type(topicUri) not in [unicode, str]: 1513 raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri)) 1514 1515 turi = self.prefixes.resolveOrPass(topicUri) ### PFX - keep 1516 if self.subscriptions.has_key(turi): 1517 msg = [WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE, topicUri] 1518 o = self.factory._serialize(msg) 1519 self.sendMessage(o) 1520 del self.subscriptions[turi] 1521 1522 1523 1524class WampClientFactory(WebSocketClientFactory, WampFactory): 1525 """ 1526 Twisted client factory for WAMP. 1527 """ 1528 1529 protocol = WampClientProtocol 1530 1531 def __init__(self, 1532 url, 1533 debug = False, 1534 debugCodePaths = False, 1535 debugWamp = False, 1536 debugApp = False, 1537 reactor = None): 1538 self.debugWamp = debugWamp 1539 self.debugApp = debugApp 1540 WebSocketClientFactory.__init__(self, 1541 url, 1542 protocols = ["wamp"], 1543 debug = debug, 1544 debugCodePaths = debugCodePaths, 1545 reactor = reactor) 1546 WampFactory.__init__(self) 1547 1548 1549 def startFactory(self): 1550 """ 1551 Called by Twisted when the factory starts up. When overriding, make 1552 sure to call the base method. 1553 """ 1554 if self.debugWamp: 1555 log.msg("WebSocketClientFactory starting") 1556 1557 1558 def stopFactory(self): 1559 """ 1560 Called by Twisted when the factory shuts down. When overriding, make 1561 sure to call the base method. 1562 """ 1563 if self.debugWamp: 1564 log.msg("WebSocketClientFactory stopped") 1565 1566 1567 1568class WampCraProtocol(WampProtocol): 1569 """ 1570 Base class for WAMP Challenge-Response Authentication protocols (client and server). 1571 1572 WAMP-CRA is a cryptographically strong challenge response authentication 1573 protocol based on HMAC-SHA256. 1574 1575 The protocol performs in-band authentication of WAMP clients to WAMP servers. 1576 1577 WAMP-CRA does not introduce any new WAMP protocol level message types, but 1578 implements the authentication handshake via standard WAMP RPCs with well-known 1579 procedure URIs and signatures. 1580 """ 1581 1582 def deriveKey(secret, extra = None): 1583 """ 1584 Computes a derived cryptographic key from a password according to PBKDF2 1585 http://en.wikipedia.org/wiki/PBKDF2. 1586 1587 The function will only return a derived key if at least 'salt' is 1588 present in the 'extra' dictionary. The complete set of attributes 1589 that can be set in 'extra': 1590 1591 salt: The salt value to be used. 1592 iterations: Number of iterations of derivation algorithm to run. 1593 keylen: Key length to derive. 1594 1595 :returns str -- The derived key or the original secret. 1596 """ 1597 if type(extra) == dict and extra.has_key('salt'): 1598 salt = str(extra['salt']) 1599 iterations = int(extra.get('iterations', 10000)) 1600 keylen = int(extra.get('keylen', 32)) 1601 b = pbkdf2_bin(secret, salt, iterations, keylen, hashlib.sha256) 1602 return binascii.b2a_base64(b).strip() 1603 else: 1604 return secret 1605 1606 deriveKey = staticmethod(deriveKey) 1607 1608 1609 def authSignature(self, authChallenge, authSecret = None, authExtra = None): 1610 """ 1611 Compute the authentication signature from an authentication challenge and a secret. 1612 1613 :param authChallenge: The authentication challenge. 1614 :type authChallenge: str 1615 :param authSecret: The authentication secret. 1616 :type authSecret: str 1617 :authExtra: Extra authentication information for salting the secret. (salt, keylen, 1618 iterations) 1619 :type authExtra: dict 1620 1621 :returns str -- The authentication signature. 1622 """ 1623 if authSecret is None: 1624 authSecret = "" 1625 if isinstance(authSecret, unicode): 1626 authSecret = authSecret.encode('utf8') 1627 authSecret = WampCraProtocol.deriveKey(authSecret, authExtra) 1628 h = hmac.new(authSecret, authChallenge, hashlib.sha256) 1629 sig = binascii.b2a_base64(h.digest()).strip() 1630 return sig 1631 1632 1633 1634class WampCraClientProtocol(WampClientProtocol, WampCraProtocol): 1635 """ 1636 Simple, authenticated WAMP client protocol. 1637 1638 The client can perform WAMP-Challenge-Response-Authentication ("WAMP-CRA") to authenticate 1639 itself to a WAMP server. The server needs to implement WAMP-CRA also of course. 1640 """ 1641 1642 def authenticate(self, authKey = None, authExtra = None, authSecret = None): 1643 """ 1644 Authenticate the WAMP session to server. 1645 1646 :param authKey: The key of the authentication credentials, something like a user or application name. 1647 :type authKey: str 1648 :param authExtra: Any extra authentication information. 1649 :type authExtra: dict 1650 :param authSecret: The secret of the authentication credentials, something like the user password or application secret key. 1651 :type authsecret: str 1652 1653 :returns Deferred -- Deferred that fires upon authentication success (with permissions) or failure. 1654 """ 1655 1656 def _onAuthChallenge(challenge): 1657 if authKey is not None: 1658 challengeObj = self.factory._unserialize(challenge) 1659 if 'authextra' in challengeObj: 1660 authExtra = challengeObj['authextra'] 1661 sig = self.authSignature(challenge, authSecret, authExtra) 1662 else: 1663 sig = self.authSignature(challenge, authSecret) 1664 else: 1665 sig = None 1666 d = self.call(WampProtocol.URI_WAMP_PROCEDURE + "auth", sig) 1667 return d 1668 1669 d = self.call(WampProtocol.URI_WAMP_PROCEDURE + "authreq", authKey, authExtra) 1670 d.addCallback(_onAuthChallenge) 1671 return d 1672 1673 1674 1675class WampCraServerProtocol(WampServerProtocol, WampCraProtocol): 1676 """ 1677 Simple, authenticating WAMP server protocol. 1678 1679 The server lets clients perform WAMP-Challenge-Response-Authentication ("WAMP-CRA") 1680 to authenticate. The clients need to implement WAMP-CRA also of course. 1681 1682 To implement an authenticating server, override: 1683 1684 * getAuthSecret 1685 * getAuthPermissions 1686 * onAuthenticated 1687 1688 in your class deriving from this class. 1689 """ 1690 1691 clientAuthTimeout = 0 1692 """ 1693 Client authentication timeout in seconds or 0 for infinite. A client 1694 must perform authentication after the initial WebSocket handshake within 1695 this timeout or the connection is failed. 1696 """ 1697 1698 clientAuthAllowAnonymous = True 1699 """ 1700 Allow anonymous client authentication. When this is set to True, a client 1701 may "authenticate" as anonymous. 1702 """ 1703 1704 1705 def getAuthPermissions(self, authKey, authExtra): 1706 """ 1707 Get the permissions the session is granted when the authentication succeeds 1708 for the given key / extra information. 1709 1710 Override in derived class to implement your authentication. 1711 1712 A permissions object is structured like this:: 1713 1714 {'permissions': {'rpc': [ 1715 {'uri': / RPC Endpoint URI - String /, 1716 'call': / Allow to call? - Boolean /} 1717 ], 1718 'pubsub': [ 1719 {'uri': / PubSub Topic URI / URI prefix - String /, 1720 'prefix': / URI matched by prefix? - Boolean /, 1721 'pub': / Allow to publish? - Boolean /, 1722 'sub': / Allow to subscribe? - Boolean /} 1723 ] 1724 } 1725 } 1726 1727 You can add custom information to this object. The object will be provided again 1728 when the client authentication succeeded in :meth:`onAuthenticated`. 1729 1730 :param authKey: The authentication key. 1731 :type authKey: str 1732 :param authExtra: Authentication extra information. 1733 :type authExtra: dict 1734 1735 :returns obj or Deferred -- Return a permissions object or None when no permissions granted. 1736 """ 1737 return None 1738 1739 1740 def getAuthSecret(self, authKey): 1741 """ 1742 Get the authentication secret for an authentication key, i.e. the 1743 user password for the user name. Return None when the authentication 1744 key does not exist. 1745 1746 Override in derived class to implement your authentication. 1747 1748 :param authKey: The authentication key. 1749 :type authKey: str 1750 1751 :returns str or Deferred -- The authentication secret for the key or None when the key does not exist. 1752 """ 1753 return None 1754 1755 1756 def onAuthTimeout(self): 1757 """ 1758 Fired when the client does not authenticate itself in time. The default implementation 1759 will simply fail the connection. 1760 1761 May be overridden in derived class. 1762 """ 1763 if not self._clientAuthenticated: 1764 log.msg("failing connection upon client authentication timeout [%s secs]" % self.clientAuthTimeout) 1765 self.failConnection() 1766 1767 1768 def onAuthenticated(self, authKey, permissions): 1769 """ 1770 Fired when client authentication was successful. 1771 1772 Override in derived class and register PubSub topics and/or RPC endpoints. 1773 1774 :param authKey: The authentication key the session was authenticated for. 1775 :type authKey: str 1776 :param permissions: The permissions object returned from :meth:`getAuthPermissions`. 1777 :type permissions: obj 1778 """ 1779 pass 1780 1781 1782 def registerForPubSubFromPermissions(self, permissions): 1783 """ 1784 Register topics for PubSub from auth permissions. 1785 1786 :param permissions: The permissions granted to the now authenticated client. 1787 :type permissions: list 1788 """ 1789 for p in permissions['pubsub']: 1790 ## register topics for the clients 1791 ## 1792 pubsub = (WampServerProtocol.PUBLISH if p['pub'] else 0) | \ 1793 (WampServerProtocol.SUBSCRIBE if p['sub'] else 0) 1794 topic = p['uri'] 1795 if self.pubHandlers.has_key(topic) or self.subHandlers.has_key(topic): 1796 ## FIXME: handle dups! 1797 log.msg("DUPLICATE TOPIC PERMISSION !!! " + topic) 1798 self.registerForPubSub(topic, p['prefix'], pubsub) 1799 1800 1801 def onSessionOpen(self): 1802 """ 1803 Called when WAMP session has been established, but not yet authenticated. The default 1804 implementation will prepare the session allowing the client to authenticate itself. 1805 """ 1806 1807 ## register RPC endpoints for WAMP-CRA authentication 1808 ## 1809 self.registerForRpc(self, WampProtocol.URI_WAMP_PROCEDURE, [WampCraServerProtocol.authRequest, 1810 WampCraServerProtocol.auth]) 1811 1812 ## reset authentication state 1813 ## 1814 self._clientAuthenticated = False 1815 self._clientPendingAuth = None 1816 self._clientAuthTimeoutCall = None 1817 1818 ## client authentication timeout 1819 ## 1820 if self.clientAuthTimeout > 0: 1821 self._clientAuthTimeoutCall = self.factory.reactor.callLater(self.clientAuthTimeout, self.onAuthTimeout) 1822 1823 1824 @exportRpc("authreq") 1825 def authRequest(self, authKey = None, extra = None): 1826 """ 1827 RPC endpoint for clients to initiate the authentication handshake. 1828 1829 :param authKey: Authentication key, such as user name or application name. 1830 :type authKey: str 1831 :param extra: Authentication extra information. 1832 :type extra: dict 1833 1834 :returns str -- Authentication challenge. The client will need to create an authentication signature from this. 1835 """ 1836 1837 ## check authentication state 1838 ## 1839 if self._clientAuthenticated: 1840 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "already-authenticated"), "already authenticated") 1841 if self._clientPendingAuth is not None: 1842 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "authentication-already-requested"), "authentication request already issues - authentication pending") 1843 1844 ## check extra 1845 ## 1846 if extra: 1847 if type(extra) != dict: 1848 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "extra not a dictionary (was %s)." % str(type(extra))) 1849 else: 1850 extra = {} 1851 #for k in extra: 1852 # if type(extra[k]) not in [str, unicode, int, long, float, bool, types.NoneType]: 1853 # raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "attribute '%s' in extra not a primitive type (was %s)" % (k, str(type(extra[k])))) 1854 1855 ## check authKey 1856 ## 1857 if authKey is None and not self.clientAuthAllowAnonymous: 1858 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "anonymous-auth-forbidden"), "authentication as anonymous forbidden") 1859 1860 if type(authKey) not in [str, unicode, types.NoneType]: 1861 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "authentication key must be a string (was %s)" % str(type(authKey))) 1862 1863 d = maybeDeferred(self.getAuthSecret, authKey) 1864 1865 def onGetAuthSecretOk(authSecret, authKey, extra): 1866 if authKey is not None and authSecret is None: 1867 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "no-such-authkey"), "authentication key '%s' does not exist." % authKey) 1868 1869 ## each authentication request gets a unique authid, which can only be used (later) once! 1870 ## 1871 authid = newid() 1872 1873 ## create authentication challenge 1874 ## 1875 info = {'authid': authid, 'authkey': authKey, 'timestamp': utcnow(), 'sessionid': self.session_id, 1876 'extra': extra} 1877 1878 pp = maybeDeferred(self.getAuthPermissions, authKey, extra) 1879 1880 def onAuthPermissionsOk(res): 1881 if res is None: 1882 res = {'permissions': {'pubsub': [], 'rpc': []}} 1883 info['permissions'] = res['permissions'] 1884 if 'authextra' in res: 1885 info['authextra'] = res['authextra'] 1886 1887 if authKey: 1888 ## authenticated session 1889 ## 1890 infoser = self.factory._serialize(info) 1891 sig = self.authSignature(infoser, authSecret) 1892 1893 self._clientPendingAuth = (info, sig, res) 1894 return infoser 1895 else: 1896 ## anonymous session 1897 ## 1898 self._clientPendingAuth = (info, None, res) 1899 return None 1900 1901 def onAuthPermissionsError(e): 1902 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "auth-permissions-error"), str(e)) 1903 1904 pp.addCallbacks(onAuthPermissionsOk, onAuthPermissionsError) 1905 1906 return pp 1907 1908 d.addCallback(onGetAuthSecretOk, authKey, extra) 1909 return d 1910 1911 1912 @exportRpc("auth") 1913 def auth(self, signature = None): 1914 """ 1915 RPC endpoint for clients to actually authenticate after requesting authentication and computing 1916 a signature from the authentication challenge. 1917 1918 :param signature: Authentication signature computed by the client. 1919 :type signature: str 1920 1921 :returns list -- A list of permissions the client is granted when authentication was successful. 1922 """ 1923 1924 ## check authentication state 1925 ## 1926 if self._clientAuthenticated: 1927 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "already-authenticated"), "already authenticated") 1928 if self._clientPendingAuth is None: 1929 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "no-authentication-requested"), "no authentication previously requested") 1930 1931 ## check signature 1932 ## 1933 if type(signature) not in [str, unicode, types.NoneType]: 1934 raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "signature must be a string or None (was %s)" % str(type(signature))) 1935 if self._clientPendingAuth[1] != signature: 1936 ## delete pending authentication, so that no retries are possible. authid is only valid for 1 try!! 1937 ## 1938 self._clientPendingAuth = None 1939 1940 ## notify the client of failed authentication, but only after a random, 1941 ## exponentially distributed delay. this (further) protects against 1942 ## timing attacks 1943 ## 1944 d = Deferred() 1945 def fail(): 1946 ## FIXME: (optionally) drop the connection instead of returning RPC error? 1947 ## 1948 d.errback(Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-signature"), "signature for authentication request is invalid")) 1949 failDelaySecs = random.expovariate(1.0 / 0.8) # mean = 0.8 secs 1950 self.factory.reactor.callLater(failDelaySecs, fail) 1951 return d 1952 1953 ## at this point, the client has successfully authenticated! 1954 1955 ## get the permissions we determined earlier 1956 ## 1957 perms = self._clientPendingAuth[2] 1958 1959 ## delete auth request and mark client as authenticated 1960 ## 1961 authKey = self._clientPendingAuth[0]['authkey'] 1962 self._clientAuthenticated = True 1963 self._clientPendingAuth = None 1964 if self._clientAuthTimeoutCall is not None: 1965 self._clientAuthTimeoutCall.cancel() 1966 self._clientAuthTimeoutCall = None 1967 1968 ## fire authentication callback 1969 ## 1970 self.onAuthenticated(authKey, perms) 1971 1972 ## return permissions to client 1973 ## 1974 return perms['permissions'] 1975 1976 1977 1978class Call: 1979 """ 1980 Thin-wrapper for incoming RPCs provided to call handlers registered via 1981 1982 - registerHandlerMethodForRpc 1983 - registerHandlerProcedureForRpc 1984 """ 1985 1986 1987 def __init__(self, 1988 proto, 1989 callid, 1990 uri, 1991 args, 1992 extra = None): 1993 self.proto = proto 1994 self.callid = callid 1995 self.uri = uri 1996 self.args = args 1997 self.extra = extra 1998 if self.proto.trackTimings: 1999 self.timings = Tracker(tracker=None, tracked=None) 2000 else: 2001 self.timings = None 2002 2003 def track(self, key): 2004 if self.timings: 2005 self.timings.track(key) 2006 2007 2008 2009class Handler(object): 2010 """ 2011 A handler for a certain class of messages. 2012 """ 2013 2014 2015 typeid = None 2016 tracker = None 2017 2018 2019 def __init__(self, proto, prefixes): 2020 """ 2021 Remember protocol and prefix map in instance variables. 2022 """ 2023 self.proto = proto 2024 self.prefixes = prefixes 2025 2026 2027 def handleMessage(self, msg_parts): 2028 """ 2029 Template method for handling a message. 2030 2031 Check if the correct handler for the message type was 2032 called. Afterwards, assign all relevant parts of the message to 2033 instance variables and call the (overridden) method 2034 _handleMessage to actually handle the message. 2035 """ 2036 msgtype = msg_parts[0] 2037 if self.typeid: 2038 assert msgtype == self.typeid, \ 2039 "Message type %s does not match type id %s" % (msgtype, 2040 self.typeid) 2041 else: 2042 assert False, \ 2043 "No typeid defined for %s" % self.__class__.__name__ 2044 2045 if self._messageIsValid(msg_parts): 2046 self._parseMessageParts(msg_parts) 2047 self._handleMessage() 2048 2049 2050 def _parseMessageParts(self, msg_parts): 2051 """ 2052 Assign the message parts to instance variables. 2053 Has to be overridden in subclasses. 2054 """ 2055 raise NotImplementedError 2056 2057 def _messageIsValid(self, msg_parts): 2058 """ 2059 Check if the message parts have expected properties (type, etc.). 2060 Has to be overridden in subclasses. 2061 """ 2062 raise NotImplementedError 2063 2064 2065 def _handleMessage(self): 2066 """ 2067 Handle a specific kind of message. 2068 Has to be overridden in subclasses. 2069 """ 2070 raise NotImplementedError 2071 2072 2073 2074class CallHandler(Handler): 2075 """ 2076 A handler for incoming RPC calls. 2077 """ 2078 2079 typeid = WampProtocol.MESSAGE_TYPEID_CALL 2080 2081 2082 def _messageIsValid(self, msg_parts): 2083 callid, uri = msg_parts[1:3] 2084 if not isinstance(callid, (str, unicode)): 2085 self.proto._protocolError( 2086 ("WAMP CALL message with invalid type %s for " 2087 "<callid>") % type(callid)) 2088 return False 2089 2090 if not isinstance(uri, (str, unicode)): 2091 self.proto._protocolError( 2092 ("WAMP CALL message with invalid type %s for " 2093 "<uri>") % type(uri)) 2094 return False 2095 2096 return True 2097 2098 2099 def _parseMessageParts(self, msg_parts): 2100 """ 2101 Parse message and create call object. 2102 """ 2103 self.callid = msg_parts[1] 2104 self.uri = self.prefixes.resolveOrPass(msg_parts[2]) ### PFX - remove 2105 self.args = msg_parts[3:] 2106 2107 2108 def _handleMessage(self): 2109 """ 2110 Perform the RPC call and attach callbacks to its deferred object. 2111 """ 2112 call = self._onBeforeCall() 2113 2114 ## execute incoming RPC 2115 d = maybeDeferred(self._callProcedure, call) 2116 2117 ## register callback and errback with extra argument call 2118 d.addCallbacks(self._onAfterCallSuccess, 2119 self._onAfterCallError, 2120 callbackArgs = (call,), 2121 errbackArgs = (call,)) 2122 2123 2124 def _onBeforeCall(self): 2125 """ 2126 Create call object to move around call data 2127 """ 2128 uri, args = self.proto.onBeforeCall(self.callid, self.uri, self.args, bool(self.proto.procForUri(self.uri))) 2129 2130 call = Call(self.proto, self.callid, uri, args) 2131 call.track("onBeforeCall") 2132 return call 2133 2134 2135 def _callProcedure(self, call): 2136 """ 2137 Actually performs the call of a procedure invoked via RPC. 2138 """ 2139 m = self.proto.procForUri(call.uri) 2140 if m is None: 2141 raise Exception(WampProtocol.URI_WAMP_ERROR_NO_SUCH_RPC_ENDPOINT, "No RPC endpoint registered for %s." % call.uri) 2142 2143 obj, method_or_proc, is_handler = m[:3] 2144 if not is_handler: 2145 return self._performProcedureCall(call, obj, method_or_proc) 2146 else: 2147 call.extra = m[3] 2148 return self._delegateToRpcHandler(call, obj, method_or_proc) 2149 2150 2151 def _performProcedureCall(self, call, obj, method_or_proc): 2152 """ 2153 Perform a RPC method / procedure call. 2154 """ 2155 cargs = tuple(call.args) if call.args else () 2156 if obj: 2157 ## call object method 2158 return method_or_proc(obj, *cargs) 2159 else: 2160 ## call free-standing function/procedure 2161 return method_or_proc(*cargs) 2162 2163 2164 def _delegateToRpcHandler(self, call, obj, method_or_proc): 2165 """ 2166 Delegate call to RPC handler. 2167 """ 2168 if obj: 2169 ## call RPC handler on object 2170 return method_or_proc(obj, call) 2171 else: 2172 ## call free-standing RPC handler 2173 return method_or_proc(call) 2174 2175 2176 def _onAfterCallSuccess(self, result, call): 2177 """ 2178 Execute custom success handler and send call result. 2179 """ 2180 ## track timing and fire user callback 2181 call.track("onAfterCallSuccess") 2182 call.result = self.proto.onAfterCallSuccess(result, call) 2183 2184 ## send out WAMP message 2185 self._sendCallResult(call) 2186 2187 2188 def _onAfterCallError(self, error, call): 2189 """ 2190 Execute custom error handler and send call error. 2191 """ 2192 ## track timing and fire user callback 2193 call.track("onAfterCallError") 2194 call.error = self.proto.onAfterCallError(error, call) 2195 2196 ## send out WAMP message 2197 self._sendCallError(call) 2198 2199 2200 def _sendCallResult(self, call): 2201 """ 2202 Marshal and send a RPC success result. 2203 """ 2204 msg = [WampProtocol.MESSAGE_TYPEID_CALL_RESULT, call.callid, call.result] 2205 try: 2206 rmsg = self.proto.serializeMessage(msg) 2207 except: 2208 raise Exception("call result not JSON serializable") 2209 else: 2210 ## now actually send WAMP message 2211 self.proto.sendMessage(rmsg) 2212 2213 ## track timing and fire user callback 2214 call.track("onAfterSendCallSuccess") 2215 self.proto.onAfterSendCallSuccess(rmsg, call) 2216 2217 2218 def _sendCallError(self, call): 2219 """ 2220 Marshal and send a RPC error result. 2221 """ 2222 killsession = False 2223 rmsg = None 2224 try: 2225 error_info, killsession = self._extractErrorInfo(call) 2226 rmsg = self._assembleErrorMessage(call, *error_info) 2227 except Exception as e: 2228 rmsg = self._handleProcessingError(call, e) 2229 finally: 2230 if rmsg: 2231 ## now actually send WAMP message 2232 self.proto.sendMessage(rmsg) 2233 2234 ## track timing and fire user callback 2235 call.track("onAfterSendCallError") 2236 self.proto.onAfterSendCallError(rmsg, call) 2237 2238 if killsession: 2239 self.proto.sendClose(3000, "killing WAMP session upon request by application exception") 2240 else: 2241 raise Exception("fatal: internal error in CallHandler._sendCallError") 2242 2243 2244 def _extractErrorInfo(self, call): 2245 """ 2246 Extract error information from the call. 2247 """ 2248 ## get error args and len 2249 ## 2250 eargs = call.error.value.args 2251 nargs = len(eargs) 2252 2253 if nargs > 4: 2254 raise Exception("invalid args length %d for exception" % nargs) 2255 2256 ## erroruri & errordesc 2257 ## 2258 if nargs == 0: 2259 erroruri = WampProtocol.URI_WAMP_ERROR_GENERIC 2260 errordesc = WampProtocol.DESC_WAMP_ERROR_GENERIC 2261 elif nargs == 1: 2262 erroruri = WampProtocol.URI_WAMP_ERROR_GENERIC 2263 errordesc = eargs[0] 2264 else: 2265 erroruri = eargs[0] 2266 errordesc = eargs[1] 2267 2268 ## errordetails 2269 ## 2270 errordetails = None 2271 if nargs >= 3: 2272 errordetails = eargs[2] 2273 elif self.proto.includeTraceback: 2274 try: 2275 ## we'd like to do .. 2276 #tb = call.error.getTraceback() 2277 2278 ## .. but the implementation in Twisted 2279 ## http://twistedmatrix.com/trac/browser/tags/releases/twisted-13.1.0/twisted/python/failure.py#L529 2280 ## uses cStringIO which cannot handle Unicode string in tracebacks. Hence we do our own: 2281 io = StringIO.StringIO() 2282 call.error.printTraceback(file = io) 2283 tb = io.getvalue() 2284 2285 except Exception as ie: 2286 print("INTERNAL ERROR [_extractErrorInfo / getTraceback()]: %s" % ie) 2287 traceback.print_stack() 2288 else: 2289 errordetails = tb.splitlines() 2290 2291 ## killsession 2292 ## 2293 killsession = False 2294 if nargs >= 4: 2295 killsession = eargs[3] 2296 2297 ## recheck all error component types 2298 ## 2299 if type(erroruri) not in [str, unicode]: 2300 raise Exception("invalid type %s for errorUri" % type(erroruri)) 2301 2302 if type(errordesc) not in [str, unicode]: 2303 raise Exception("invalid type %s for errorDesc" % type(errordesc)) 2304 2305 ## errordetails must be JSON serializable. If not, we get exception later in sendMessage. 2306 ## We don't check here, since the only way would be to serialize to JSON and 2307 ## then we'd serialize twice (here and in sendMessage) 2308 2309 if type(killsession) not in [bool, types.NoneType]: 2310 raise Exception("invalid type %s for killSession" % type(killsession)) 2311 2312 return (erroruri, errordesc, errordetails), killsession 2313 2314 2315 def _assembleErrorMessage(self, call, erroruri, errordesc, errordetails): 2316 """ 2317 Assemble a WAMP RPC error message. 2318 """ 2319 if errordetails is not None: 2320 msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR, 2321 call.callid, 2322 self.prefixes.shrink(erroruri), ### PFX - remove 2323 errordesc, 2324 errordetails] 2325 else: 2326 msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR, 2327 call.callid, 2328 self.prefixes.shrink(erroruri), ### PFX - remove 2329 errordesc] 2330 2331 ## serialize message. this can fail if errorDetails is not 2332 ## serializable 2333 try: 2334 rmsg = self.proto.serializeMessage(msg) 2335 except Exception as e: 2336 raise Exception( 2337 "invalid object for errorDetails - not serializable (%s)" % 2338 str(e)) 2339 2340 return rmsg 2341 2342 2343 def _handleProcessingError(self, call, e): 2344 """ 2345 Create a message describing what went wrong during processing an 2346 exception. 2347 """ 2348 msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR, 2349 call.callid, 2350 ### PFX - remove 2351 self.prefixes.shrink(WampProtocol.URI_WAMP_ERROR_INTERNAL), 2352 str(e)] 2353 2354 if self.proto.includeTraceback: 2355 try: 2356 tb = call.error.getTraceback() 2357 except Exception as ie: 2358 ## FIXME: find out why this can fail with 2359 ## "'unicode' does not have the buffer interface" 2360 print("INTERNAL ERROR (getTraceback): %s" % ie) 2361 else: 2362 msg.append(tb.splitlines()) 2363 2364 result = self.proto.serializeMessage(msg) 2365 return result 2366 2367 2368 2369 2370class CallResultHandler(Handler): 2371 """ 2372 A handler for to RPC call results. 2373 """ 2374 2375 typeid = WampProtocol.MESSAGE_TYPEID_CALL_RESULT 2376 2377 2378 def _messageIsValid(self, msg_parts): 2379 if len(msg_parts) < 2: 2380 self.proto._protocolError( 2381 "WAMP CALL_RESULT message without <callid>") 2382 return False 2383 if len(msg_parts) != 3: 2384 self.proto._protocolError( 2385 "WAMP CALL_RESULT message with invalid length %d" % len(msg_parts)) 2386 return False 2387 2388 if type(msg_parts[1]) not in [unicode, str]: 2389 self.proto._protocolError( 2390 ("WAMP CALL_RESULT message with invalid type %s for " 2391 "<callid>") % type(msg_parts[1])) 2392 return False 2393 2394 return True 2395 2396 2397 def _parseMessageParts(self, msg_parts): 2398 """ 2399 Extract call result from message parts. 2400 """ 2401 self.callid = str(msg_parts[1]) 2402 self.result = msg_parts[2] 2403 2404 2405 def _handleMessage(self): 2406 ## Pop and process Call Deferred 2407 ## 2408 d = self.proto.calls.pop(self.callid, None) 2409 if d: 2410 ## WAMP CALL_RESULT 2411 ## 2412 d.callback(self.result) 2413 else: 2414 if self.proto.debugWamp: 2415 log.msg("callid not found for received call result message") 2416 2417 2418 2419class CallErrorHandler(Handler): 2420 2421 typeid = WampProtocol.MESSAGE_TYPEID_CALL_ERROR 2422 2423 2424 def _messageIsValid(self, msg_parts): 2425 if len(msg_parts) not in [4, 5]: 2426 self.proto._protocolError( 2427 "call error message invalid length %d" % len(msg_parts)) 2428 return False 2429 2430 ## Error URI 2431 ## 2432 if type(msg_parts[2]) not in [unicode, str]: 2433 self.proto._protocolError( 2434 "invalid type %s for errorUri in call error message" % 2435 str(type(msg_parts[2]))) 2436 return False 2437 2438 ## Error Description 2439 ## 2440 if type(msg_parts[3]) not in [unicode, str]: 2441 self.proto._protocolError( 2442 "invalid type %s for errorDesc in call error message" % 2443 str(type(msg_parts[3]))) 2444 return False 2445 2446 return True 2447 2448 2449 def _parseMessageParts(self, msg_parts): 2450 """ 2451 Extract error information from message parts. 2452 """ 2453 self.callid = str(msg_parts[1]) 2454 self.erroruri = str(msg_parts[2]) 2455 self.errordesc = str(msg_parts[3]) 2456 2457 ## Error Details 2458 ## 2459 if len(msg_parts) > 4: 2460 self.errordetails = msg_parts[4] 2461 else: 2462 self.errordetails = None 2463 2464 2465 def _handleMessage(self): 2466 """ 2467 Fire Call Error Deferred. 2468 """ 2469 ## 2470 ## Pop and process Call Deferred 2471 d = self.proto.calls.pop(self.callid, None) 2472 if d: 2473 e = Exception(self.erroruri, self.errordesc, self.errordetails) 2474 d.errback(e) 2475 else: 2476 if self.proto.debugWamp: 2477 log.msg("callid not found for received call error message") 2478