1###############################################################################
2##
3##  Copyright (C) 2011-2013 Tavendo GmbH
4##
5##  Licensed under the Apache License, Version 2.0 (the "License");
6##  you may not use this file except in compliance with the License.
7##  You may obtain a copy of the License at
8##
9##      http://www.apache.org/licenses/LICENSE-2.0
10##
11##  Unless required by applicable law or agreed to in writing, software
12##  distributed under the License is distributed on an "AS IS" BASIS,
13##  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14##  See the License for the specific language governing permissions and
15##  limitations under the License.
16##
17###############################################################################
18
19from __future__ import absolute_import
20
21import sys
22PY3 = sys.version_info >= (3,)
23
24
25__all__ = ("WampProtocol",
26           "WampFactory",
27           "WampServerProtocol",
28           "WampServerFactory",
29           "WampClientProtocol",
30           "WampClientFactory",
31           "WampCraProtocol",
32           "WampCraClientProtocol",
33           "WampCraServerProtocol",
34           "json_lib",
35           "json_loads",
36           "json_dumps",)
37
38
39import inspect, types
40import traceback
41
42if PY3:
43   from io import StringIO
44else:
45   import StringIO
46
47import hashlib, hmac, binascii, random
48
49from twisted.python import log
50from twisted.internet.defer import Deferred, \
51                                   maybeDeferred
52
53from autobahn import __version__
54
55from autobahn.websocket.protocol import WebSocketProtocol
56
57from autobahn.websocket import http
58from autobahn.twisted.websocket import WebSocketClientProtocol, \
59                                       WebSocketClientFactory, \
60                                       WebSocketServerFactory, \
61                                       WebSocketServerProtocol
62from autobahn.wamp1.pbkdf2 import pbkdf2_bin
63from autobahn.wamp1.prefixmap import PrefixMap
64from autobahn.util import utcnow, newid, Tracker
65
66
67def exportRpc(arg = None):
68   """
69   Decorator for RPC'ed callables.
70   """
71   ## decorator without argument
72   if type(arg) is types.FunctionType:
73      arg._autobahn_rpc_id = arg.__name__
74      return arg
75   ## decorator with argument
76   else:
77      def inner(f):
78         f._autobahn_rpc_id = arg
79         return f
80      return inner
81
82def exportSub(arg, prefixMatch = False):
83   """
84   Decorator for subscription handlers.
85   """
86   def inner(f):
87      f._autobahn_sub_id = arg
88      f._autobahn_sub_prefix_match = prefixMatch
89      return f
90   return inner
91
92def exportPub(arg, prefixMatch = False):
93   """
94   Decorator for publication handlers.
95   """
96   def inner(f):
97      f._autobahn_pub_id = arg
98      f._autobahn_pub_prefix_match = prefixMatch
99      return f
100   return inner
101
102
103class WampProtocol:
104   """
105   WAMP protocol base class. Mixin for WampServerProtocol and WampClientProtocol.
106   """
107
108   URI_WAMP_BASE = "http://api.wamp.ws/"
109   """
110   WAMP base URI for WAMP predefined things.
111   """
112
113   URI_WAMP_ERROR = URI_WAMP_BASE + "error#"
114   """
115   Prefix for WAMP errors.
116   """
117
118   URI_WAMP_PROCEDURE = URI_WAMP_BASE + "procedure#"
119   """
120   Prefix for WAMP predefined RPC endpoints.
121   """
122
123   URI_WAMP_TOPIC = URI_WAMP_BASE + "topic#"
124   """
125   Prefix for WAMP predefined PubSub topics.
126   """
127
128   URI_WAMP_ERROR_GENERIC = URI_WAMP_ERROR + "generic"
129   """
130   WAMP error URI for generic errors.
131   """
132
133   DESC_WAMP_ERROR_GENERIC = "generic error"
134   """
135   Description for WAMP generic errors.
136   """
137
138   URI_WAMP_ERROR_INTERNAL = URI_WAMP_ERROR + "internal"
139   """
140   WAMP error URI for internal errors.
141   """
142
143   DESC_WAMP_ERROR_INTERNAL = "internal error"
144   """
145   Description for WAMP internal errors.
146   """
147
148   URI_WAMP_ERROR_NO_SUCH_RPC_ENDPOINT = URI_WAMP_ERROR + "NoSuchRPCEndpoint"
149   """
150   WAMP error URI for RPC endpoint not found.
151   """
152
153   WAMP_PROTOCOL_VERSION         = 1
154   """
155   WAMP version this server speaks. Versions are numbered consecutively
156   (integers, no gaps).
157   """
158
159   MESSAGE_TYPEID_WELCOME        = 0
160   """
161   Server-to-client welcome message containing session ID.
162   """
163
164   MESSAGE_TYPEID_PREFIX         = 1
165   """
166   Client-to-server message establishing a URI prefix to be used in CURIEs.
167   """
168
169   MESSAGE_TYPEID_CALL           = 2
170   """
171   Client-to-server message initiating an RPC.
172   """
173
174   MESSAGE_TYPEID_CALL_RESULT    = 3
175   """
176   Server-to-client message returning the result of a successful RPC.
177   """
178
179   MESSAGE_TYPEID_CALL_ERROR     = 4
180   """
181   Server-to-client message returning the error of a failed RPC.
182   """
183
184   MESSAGE_TYPEID_SUBSCRIBE      = 5
185   """
186   Client-to-server message subscribing to a topic.
187   """
188
189   MESSAGE_TYPEID_UNSUBSCRIBE    = 6
190   """
191   Client-to-server message unsubscribing from a topic.
192   """
193
194   MESSAGE_TYPEID_PUBLISH        = 7
195   """
196   Client-to-server message publishing an event to a topic.
197   """
198
199   MESSAGE_TYPEID_EVENT          = 8
200   """
201   Server-to-client message providing the event of a (subscribed) topic.
202   """
203
204   def connectionMade(self):
205      self.debugWamp = self.factory.debugWamp
206      self.debugApp = self.factory.debugApp
207      self.prefixes = PrefixMap()
208      self.calls = {}
209      self.procs = {}
210
211
212   def connectionLost(self, reason):
213      pass
214
215
216   def _protocolError(self, reason):
217      if self.debugWamp:
218         log.msg("Closing Wamp session on protocol violation : %s" % reason)
219
220      ## FIXME: subprotocols are probably not supposed to close with CLOSE_STATUS_CODE_PROTOCOL_ERROR
221      ##
222      self.protocolViolation("Wamp RPC/PubSub protocol violation ('%s')" % reason)
223
224
225   def shrink(self, uri, passthrough = False):
226      """
227      Shrink given URI to CURIE according to current prefix mapping.
228      If no appropriate prefix mapping is available, return original URI.
229
230      :param uri: URI to shrink.
231      :type uri: str
232
233      :returns str -- CURIE or original URI.
234      """
235      return self.prefixes.shrink(uri)
236
237
238   def resolve(self, curieOrUri, passthrough = False):
239      """
240      Resolve given CURIE/URI according to current prefix mapping or return
241      None if cannot be resolved.
242
243      :param curieOrUri: CURIE or URI.
244      :type curieOrUri: str
245
246      :returns: str -- Full URI for CURIE or None.
247      """
248      return self.prefixes.resolve(curieOrUri)
249
250
251   def resolveOrPass(self, curieOrUri):
252      """
253      Resolve given CURIE/URI according to current prefix mapping or return
254      string verbatim if cannot be resolved.
255
256      :param curieOrUri: CURIE or URI.
257      :type curieOrUri: str
258
259      :returns: str -- Full URI for CURIE or original string.
260      """
261      return self.prefixes.resolveOrPass(curieOrUri)
262
263
264   def serializeMessage(self, msg):
265      """
266      Delegate message serialization to the factory.
267      :param msg: The message to be serialized.
268      :type msg: str
269      :return: The serialized message.
270      """
271      return self.factory._serialize(msg)
272
273
274   def registerForRpc(self, obj, baseUri = "", methods = None):
275      """
276      Register an service object for RPC. A service object has methods
277      which are decorated using @exportRpc.
278
279      :param obj: The object to be registered (in this WebSockets session) for RPC.
280      :type obj: Object with methods decorated using @exportRpc.
281      :param baseUri: Optional base URI which is prepended to method names for export.
282      :type baseUri: String.
283      :param methods: If not None, a list of unbound class methods corresponding to obj
284                     which should be registered. This can be used to register only a subset
285                     of the methods decorated with @exportRpc.
286      :type methods: List of unbound class methods.
287      """
288      for k in inspect.getmembers(obj.__class__, inspect.ismethod):
289         if k[1].__dict__.has_key("_autobahn_rpc_id"):
290            if methods is None or k[1] in methods:
291               uri = baseUri + k[1].__dict__["_autobahn_rpc_id"]
292               proc = k[1]
293               self.registerMethodForRpc(uri, obj, proc)
294
295
296   def registerMethodForRpc(self, uri, obj, proc):
297      """
298      Register a method of an object for RPC.
299
300      :param uri: URI to register RPC method under.
301      :type uri: str
302      :param obj: The object on which to register a method for RPC.
303      :type obj: object
304      :param proc: Unbound object method to register RPC for.
305      :type proc: unbound method
306      """
307      self.procs[uri] = (obj, proc, False)
308      if self.debugWamp:
309         log.msg("registered remote method on %s" % uri)
310
311
312   def registerProcedureForRpc(self, uri, proc):
313      """
314      Register a (free standing) function/procedure for RPC.
315
316      :param uri: URI to register RPC function/procedure under.
317      :type uri: str
318      :param proc: Free-standing function/procedure.
319      :type proc: callable
320      """
321      self.procs[uri] = (None, proc, False)
322      if self.debugWamp:
323         log.msg("registered remote procedure on %s" % uri)
324
325
326   def registerHandlerMethodForRpc(self, uri, obj, handler, extra = None):
327      """
328      Register a handler on an object for RPC.
329
330      :param uri: URI to register RPC method under.
331      :type uri: str
332      :param obj: The object on which to register the RPC handler
333      :type obj: object
334      :param proc: Unbound object method to register RPC for.
335      :type proc: unbound method
336      :param extra: Optional extra data that will be given to the handler at call time.
337      :type extra: object
338      """
339      self.procs[uri] = (obj, handler, True, extra)
340      if self.debugWamp:
341         log.msg("registered remote handler method on %s" % uri)
342
343
344   def registerHandlerProcedureForRpc(self, uri, handler, extra = None):
345      """
346      Register a (free standing) handler for RPC.
347
348      :param uri: URI to register RPC handler under.
349      :type uri: str
350      :param proc: Free-standing handler
351      :type proc: callable
352      :param extra: Optional extra data that will be given to the handler at call time.
353      :type extra: object
354      """
355      self.procs[uri] = (None, handler, True, extra)
356      if self.debugWamp:
357         log.msg("registered remote handler procedure on %s" % uri)
358
359
360   def procForUri(self, uri):
361      """
362      Returns the procedure specification for `uri` or None, if it does not exist.
363
364      :param uri: URI to be checked.
365      :type uri: str
366      :returns: The procedure specification for `uri`, if it exists,
367                `None` otherwise.
368      """
369      return self.procs[uri] if uri in self.procs else None
370
371
372   def onBeforeCall(self, callid, uri, args, isRegistered):
373      """
374      Callback fired before executing incoming RPC. This can be used for
375      logging, statistics tracking or redirecting RPCs or argument mangling i.e.
376
377      The default implementation just returns the incoming URI/args.
378
379      :param uri: RPC endpoint URI (fully-qualified).
380      :type uri: str
381      :param args: RPC arguments array.
382      :type args: list
383      :param isRegistered: True, iff RPC endpoint URI is registered in this session.
384      :type isRegistered: bool
385      :returns pair -- Must return URI/Args pair.
386      """
387      return uri, args
388
389
390   def onAfterCallSuccess(self, result, call):
391      """
392      Callback fired after executing incoming RPC with success, but before
393      sending the RPC success message.
394
395      The default implementation will just return `result` to the client.
396
397      :param result: Result returned for executing the incoming RPC.
398      :type result: Anything returned by the user code for the endpoint.
399      :param call: WAMP call object for incoming RPC.
400      :type call: instance of Call
401      :returns obj -- Result send back to client.
402      """
403      return result
404
405
406   def onAfterCallError(self, error, call):
407      """
408      Callback fired after executing incoming RPC with failure, but before
409      sending the RPC error message.
410
411      The default implementation will just return `error` to the client.
412
413      :param error: Error that occurred during incomnig RPC call execution.
414      :type error: Instance of twisted.python.failure.Failure
415      :param call: WAMP call object for incoming RPC.
416      :type call: instance of Call
417      :returns twisted.python.failure.Failure -- Error sent back to client.
418      """
419      return error
420
421
422   def onAfterSendCallSuccess(self, msg, call):
423      """
424      Callback fired after sending RPC success message.
425
426      :param msg: Serialized WAMP message.
427      :type msg: str
428      :param call: WAMP call object for incoming RPC.
429      :type call: instance of Call
430      """
431      pass
432
433
434   def onAfterSendCallError(self, msg, call):
435      """
436      Callback fired after sending RPC error message.
437
438      :param msg: Serialized WAMP message.
439      :type msg: str
440      :param call: WAMP call object for incoming RPC.
441      :type call: instance of Call
442      """
443      pass
444
445
446   def call(self, *args):
447      """
448      Perform a remote-procedure call (RPC). The first argument is the procedure
449      URI (mandatory). Subsequent positional arguments can be provided (must be
450      JSON serializable). The return value is a Twisted Deferred.
451      """
452
453      if len(args) < 1:
454         raise Exception("missing procedure URI")
455
456      if type(args[0]) not in [unicode, str]:
457         raise Exception("invalid type for procedure URI")
458
459      procuri = args[0]
460      callid = None
461      while True:
462         callid = newid()
463         if not self.calls.has_key(callid):
464            break
465      d = Deferred()
466      self.calls[callid] = d
467      msg = [WampProtocol.MESSAGE_TYPEID_CALL, callid, procuri]
468      msg.extend(args[1:])
469
470      try:
471         o = self.factory._serialize(msg)
472      except:
473         raise Exception("call argument(s) not JSON serializable")
474
475      self.sendMessage(o)
476      return d
477
478
479
480## use Ultrajson (https://github.com/esnme/ultrajson) if available
481##
482try:
483   import ujson
484   json_lib = ujson
485   json_loads = ujson.loads
486   json_dumps = lambda x: ujson.dumps(x, ensure_ascii = False)
487except:
488   import json
489   json_lib = json
490   json_loads = json.loads
491   json_dumps = json.dumps
492
493
494
495class WampFactory:
496   """
497   WAMP factory base class. Mixin for WampServerFactory and WampClientFactory.
498   """
499
500   def __init__(self):
501      if self.debugWamp:
502         log.msg("Using JSON processor '%s'" % json_lib.__name__)
503
504
505   def _serialize(self, obj):
506      """
507      Default object serializer.
508      """
509      return json_dumps(obj)
510
511
512   def _unserialize(self, bytes):
513      """
514      Default object deserializer.
515      """
516      return json_loads(bytes)
517
518
519
520class WampServerProtocol(WebSocketServerProtocol, WampProtocol):
521   """
522   Server factory for Wamp RPC/PubSub.
523   """
524
525   SUBSCRIBE = 1
526   PUBLISH = 2
527
528   def onSessionOpen(self):
529      """
530      Callback fired when WAMP session was fully established.
531      """
532      pass
533
534
535   def onOpen(self):
536      """
537      Default implementation for WAMP connection opened sends
538      Welcome message containing session ID.
539      """
540      self.session_id = newid()
541
542      ## include traceback as error detail for RPC errors with
543      ## no error URI - that is errors returned with URI_WAMP_ERROR_GENERIC
544      self.includeTraceback = False
545
546      msg = [WampProtocol.MESSAGE_TYPEID_WELCOME,
547             self.session_id,
548             WampProtocol.WAMP_PROTOCOL_VERSION,
549             "Autobahn/%s" % __version__]
550      o = self.factory._serialize(msg)
551      self.sendMessage(o)
552
553      self.factory._addSession(self, self.session_id)
554      self.onSessionOpen()
555
556
557   def onConnect(self, connectionRequest):
558      """
559      Default implementation for WAMP connection acceptance:
560      check if client announced WAMP subprotocol, and only accept connection
561      if client did so.
562      """
563      for p in connectionRequest.protocols:
564         if p in self.factory.protocols:
565            return (p, {}) # return (protocol, headers)
566      raise http.HttpException(http.BAD_REQUEST[0], "this server only speaks WAMP")
567
568
569   def connectionMade(self):
570      WebSocketServerProtocol.connectionMade(self)
571      WampProtocol.connectionMade(self)
572
573      ## RPCs registered in this session (a URI map of (object, procedure)
574      ## pairs for object methods or (None, procedure) for free standing procedures)
575      self.procs = {}
576
577      ## Publication handlers registered in this session (a URI map of (object, pubHandler) pairs
578      ## pairs for object methods (handlers) or (None, None) for topic without handler)
579      self.pubHandlers = {}
580
581      ## Subscription handlers registered in this session (a URI map of (object, subHandler) pairs
582      ## pairs for object methods (handlers) or (None, None) for topic without handler)
583      self.subHandlers = {}
584
585      self.handlerMapping = {
586         self.MESSAGE_TYPEID_CALL: CallHandler(self, self.prefixes),
587         self.MESSAGE_TYPEID_CALL_RESULT: CallResultHandler(self, self.prefixes),
588         self.MESSAGE_TYPEID_CALL_ERROR: CallErrorHandler(self, self.prefixes)}
589
590
591   def connectionLost(self, reason):
592      self.factory._unsubscribeClient(self)
593      self.factory._removeSession(self)
594
595      WampProtocol.connectionLost(self, reason)
596      WebSocketServerProtocol.connectionLost(self, reason)
597
598
599   def sendMessage(self,
600                   payload,
601                   binary = False,
602                   payload_frag_size = None,
603                   sync = False,
604                   doNotCompress = False):
605      if self.debugWamp:
606         log.msg("TX WAMP: %s" % str(payload))
607      WebSocketServerProtocol.sendMessage(self,
608                                          payload,
609                                          binary,
610                                          payload_frag_size,
611                                          sync,
612                                          doNotCompress)
613
614
615   def _getPubHandler(self, topicUri):
616      ## Longest matching prefix based resolution of (full) topic URI to
617      ## publication handler.
618      ## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch)
619      ##
620      for i in xrange(len(topicUri), -1, -1):
621         tt = topicUri[:i]
622         if self.pubHandlers.has_key(tt):
623            h = self.pubHandlers[tt]
624            return (tt, topicUri[i:], h[0], h[1], h[2])
625      return None
626
627
628   def _getSubHandler(self, topicUri):
629      ## Longest matching prefix based resolution of (full) topic URI to
630      ## subscription handler.
631      ## Returns a 5-tuple (consumedUriPart, unconsumedUriPart, handlerObj, handlerProc, prefixMatch)
632      ##
633      for i in xrange(len(topicUri), -1, -1):
634         tt = topicUri[:i]
635         if self.subHandlers.has_key(tt):
636            h = self.subHandlers[tt]
637            return (tt, topicUri[i:], h[0], h[1], h[2])
638      return None
639
640
641   def registerForPubSub(self, topicUri, prefixMatch = False, pubsub = PUBLISH | SUBSCRIBE):
642      """
643      Register a topic URI as publish/subscribe channel in this session.
644
645      :param topicUri: Topic URI to be established as publish/subscribe channel.
646      :type topicUri: str
647      :param prefixMatch: Allow to match this topic URI by prefix.
648      :type prefixMatch: bool
649      :param pubsub: Allow publication and/or subscription.
650      :type pubsub: WampServerProtocol.PUB, WampServerProtocol.SUB, WampServerProtocol.PUB | WampServerProtocol.SUB
651      """
652      if pubsub & WampServerProtocol.PUBLISH:
653         self.pubHandlers[topicUri] = (None, None, prefixMatch)
654         if self.debugWamp:
655            log.msg("registered topic %s for publication (match by prefix = %s)" % (topicUri, prefixMatch))
656      if pubsub & WampServerProtocol.SUBSCRIBE:
657         self.subHandlers[topicUri] = (None, None, prefixMatch)
658         if self.debugWamp:
659            log.msg("registered topic %s for subscription (match by prefix = %s)" % (topicUri, prefixMatch))
660
661
662   def registerHandlerForPubSub(self, obj, baseUri = ""):
663      """
664      Register a handler object for PubSub. A handler object has methods
665      which are decorated using @exportPub and @exportSub.
666
667      :param obj: The object to be registered (in this WebSockets session) for PubSub.
668      :type obj: Object with methods decorated using @exportPub and @exportSub.
669      :param baseUri: Optional base URI which is prepended to topic names for export.
670      :type baseUri: String.
671      """
672      for k in inspect.getmembers(obj.__class__, inspect.ismethod):
673         if k[1].__dict__.has_key("_autobahn_pub_id"):
674            uri = baseUri + k[1].__dict__["_autobahn_pub_id"]
675            prefixMatch = k[1].__dict__["_autobahn_pub_prefix_match"]
676            proc = k[1]
677            self.registerHandlerForPub(uri, obj, proc, prefixMatch)
678         elif k[1].__dict__.has_key("_autobahn_sub_id"):
679            uri = baseUri + k[1].__dict__["_autobahn_sub_id"]
680            prefixMatch = k[1].__dict__["_autobahn_sub_prefix_match"]
681            proc = k[1]
682            self.registerHandlerForSub(uri, obj, proc, prefixMatch)
683
684
685   def registerHandlerForSub(self, uri, obj, proc, prefixMatch = False):
686      """
687      Register a method of an object as subscription handler.
688
689      :param uri: Topic URI to register subscription handler for.
690      :type uri: str
691      :param obj: The object on which to register a method as subscription handler.
692      :type obj: object
693      :param proc: Unbound object method to register as subscription handler.
694      :type proc: unbound method
695      :param prefixMatch: Allow to match this topic URI by prefix.
696      :type prefixMatch: bool
697      """
698      self.subHandlers[uri] = (obj, proc, prefixMatch)
699      if not self.pubHandlers.has_key(uri):
700         self.pubHandlers[uri] = (None, None, False)
701      if self.debugWamp:
702         log.msg("registered subscription handler for topic %s" % uri)
703
704
705   def registerHandlerForPub(self, uri, obj, proc, prefixMatch = False):
706      """
707      Register a method of an object as publication handler.
708
709      :param uri: Topic URI to register publication handler for.
710      :type uri: str
711      :param obj: The object on which to register a method as publication handler.
712      :type obj: object
713      :param proc: Unbound object method to register as publication handler.
714      :type proc: unbound method
715      :param prefixMatch: Allow to match this topic URI by prefix.
716      :type prefixMatch: bool
717      """
718      self.pubHandlers[uri] = (obj, proc, prefixMatch)
719      if not self.subHandlers.has_key(uri):
720         self.subHandlers[uri] = (None, None, False)
721      if self.debugWamp:
722         log.msg("registered publication handler for topic %s" % uri)
723
724
725   # noinspection PyDefaultArgument
726   def dispatch(self, topicUri, event, exclude = [], eligible = None):
727      """
728      Dispatch an event for a topic to all clients subscribed to
729      and authorized for that topic.
730
731      Optionally, exclude list of clients and/or only consider clients
732      from explicit eligibles. In other words, the event is delivered
733      to the set
734
735         (subscribers - excluded) & eligible
736
737      :param topicUri: URI of topic to publish event to.
738      :type topicUri: str
739      :param event: Event to dispatch.
740      :type event: obj
741      :param exclude: Optional list of clients (WampServerProtocol instances) to exclude.
742      :type exclude: list of obj
743      :param eligible: Optional list of clients (WampServerProtocol instances) eligible at all (or None for all).
744      :type eligible: list of obj
745
746      :returns twisted.internet.defer.Deferred -- Will be fired when event was
747      dispatched to all subscribers. The return value provided to the deferred
748      is a pair (delivered, requested), where delivered = number of actual
749      receivers, and requested = number of (subscribers - excluded) & eligible.
750      """
751      return self.factory.dispatch(topicUri, event, exclude, eligible)
752
753
754   def onMessage(self, msg, binary):
755      """
756      Handle WAMP messages received from WAMP client.
757      """
758
759      if self.debugWamp:
760         log.msg("RX WAMP: %s" % str(msg))
761
762      if not binary:
763         try:
764            obj = self.factory._unserialize(msg)
765            if type(obj) == list:
766
767               msgtype = obj[0]
768
769               ### XXX Replace check by try...except when all handlers
770               ### XXX are in place. Exception handling should create
771               ### XXX a protocolError message about unsupported
772               ### XXX message type
773               if msgtype in [WampProtocol.MESSAGE_TYPEID_CALL,
774                              WampProtocol.MESSAGE_TYPEID_CALL_RESULT,
775                              WampProtocol.MESSAGE_TYPEID_CALL_ERROR]:
776                  self.handlerMapping[msgtype].handleMessage(obj)
777
778               ### XXX Move remaining code to appropriate handlers
779
780               ## Subscribe Message
781               ##
782               elif msgtype == WampProtocol.MESSAGE_TYPEID_SUBSCRIBE:
783                  topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove
784                  h = self._getSubHandler(topicUri)
785                  if h:
786                     ## either exact match or prefix match allowed
787                     if h[1] == "" or h[4]:
788
789                        ## direct topic
790                        if h[2] is None and h[3] is None:
791                           self.factory._subscribeClient(self, topicUri)
792
793                        ## topic handled by subscription handler
794                        else:
795                           ## handler is object method
796                           if h[2]:
797                              a = maybeDeferred(h[3], h[2], str(h[0]), str(h[1]))
798
799                           ## handler is free standing procedure
800                           else:
801                              a = maybeDeferred(h[3], str(h[0]), str(h[1]))
802
803                           def fail(failure):
804                              if self.debugWamp:
805                                 log.msg("exception during custom subscription handler: %s" % failure)
806
807                           def done(result):
808                              ## only subscribe client if handler did return True
809                              if result:
810                                 self.factory._subscribeClient(self, topicUri)
811
812                           a.addCallback(done).addErrback(fail)
813                     else:
814                        if self.debugWamp:
815                           log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri)
816                  else:
817                     if self.debugWamp:
818                        log.msg("no topic / subscription handler registered for %s" % topicUri)
819
820               ## Unsubscribe Message
821               ##
822               elif msgtype == WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE:
823                  topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove
824                  self.factory._unsubscribeClient(self, topicUri)
825
826               ## Publish Message
827               ##
828               elif msgtype == WampProtocol.MESSAGE_TYPEID_PUBLISH:
829                  topicUri = self.prefixes.resolveOrPass(obj[1]) ### PFX - remove
830                  h = self._getPubHandler(topicUri)
831                  if h:
832                     ## either exact match or prefix match allowed
833                     if h[1] == "" or h[4]:
834
835                        ## Event
836                        ##
837                        event = obj[2]
838
839                        ## Exclude Sessions List
840                        ##
841                        exclude = [self] # exclude publisher by default
842                        if len(obj) >= 4:
843                           if type(obj[3]) == bool:
844                              if not obj[3]:
845                                 exclude = []
846                           elif type(obj[3]) == list:
847                              ## map session IDs to protos
848                              exclude = self.factory.sessionIdsToProtos(obj[3])
849                           else:
850                              ## FIXME: invalid type
851                              pass
852
853                        ## Eligible Sessions List
854                        ##
855                        eligible = None # all sessions are eligible by default
856                        if len(obj) >= 5:
857                           if type(obj[4]) == list:
858                              ## map session IDs to protos
859                              eligible = self.factory.sessionIdsToProtos(obj[4])
860                           else:
861                              ## FIXME: invalid type
862                              pass
863
864                        ## direct topic
865                        if h[2] is None and h[3] is None:
866                           self.factory.dispatch(topicUri, event, exclude, eligible)
867
868                        ## topic handled by publication handler
869                        else:
870                           ## handler is object method
871                           if h[2]:
872                              e = maybeDeferred(h[3], h[2], str(h[0]), str(h[1]), event)
873
874                           ## handler is free standing procedure
875                           else:
876                              e = maybeDeferred(h[3], str(h[0]), str(h[1]), event)
877
878                           def fail(failure):
879                              if self.debugWamp:
880                                 log.msg("exception during custom publication handler: %s" % failure)
881
882                           def done(result):
883                              ## only dispatch event if handler did return event
884                              if result:
885                                 self.factory.dispatch(topicUri, result, exclude, eligible)
886
887                           e.addCallback(done).addErrback(fail)
888                     else:
889                        if self.debugWamp:
890                           log.msg("topic %s matches only by prefix and prefix match disallowed" % topicUri)
891                  else:
892                     if self.debugWamp:
893                        log.msg("no topic / publication handler registered for %s" % topicUri)
894
895               ## Define prefix to be used in CURIEs
896               ##
897               elif msgtype == WampProtocol.MESSAGE_TYPEID_PREFIX:
898                  prefix = obj[1]
899                  uri = obj[2]
900                  self.prefixes.set(prefix, uri) ### PFX - remove whole block (this msg type won't survive)
901
902               else:
903                  log.msg("unknown message type")
904            else:
905               log.msg("msg not a list")
906         except Exception:
907            traceback.print_exc()
908      else:
909         log.msg("binary message")
910
911
912
913class WampServerFactory(WebSocketServerFactory, WampFactory):
914   """
915   Server factory for Wamp RPC/PubSub.
916   """
917
918   protocol = WampServerProtocol
919   """
920   Twisted protocol used by default for WAMP servers.
921   """
922
923   def __init__(self,
924                url,
925                debug = False,
926                debugCodePaths = False,
927                debugWamp = False,
928                debugApp = False,
929                externalPort = None,
930                reactor = None):
931      self.debugWamp = debugWamp
932      self.debugApp = debugApp
933      WebSocketServerFactory.__init__(self,
934                                      url,
935                                      protocols = ["wamp"],
936                                      debug = debug,
937                                      debugCodePaths = debugCodePaths,
938                                      externalPort = externalPort,
939                                      reactor = reactor)
940      WampFactory.__init__(self)
941
942
943   def onClientSubscribed(self, proto, topicUri):
944      """
945      Callback fired when peer was (successfully) subscribed on some topic.
946
947      :param proto: Peer protocol instance subscribed.
948      :type proto: Instance of WampServerProtocol.
949      :param topicUri: Fully qualified, resolved URI of topic subscribed.
950      :type topicUri: str
951      """
952      pass
953
954
955   def _subscribeClient(self, proto, topicUri):
956      """
957      Called from proto to subscribe client for topic.
958      """
959      if not self.subscriptions.has_key(topicUri):
960         self.subscriptions[topicUri] = set()
961         if self.debugWamp:
962            log.msg("subscriptions map created for topic %s" % topicUri)
963      if not proto in self.subscriptions[topicUri]:
964         self.subscriptions[topicUri].add(proto)
965         if self.debugWamp:
966            log.msg("subscribed peer %s on topic %s" % (proto.peer, topicUri))
967         self.onClientSubscribed(proto, topicUri)
968      else:
969         if self.debugWamp:
970            log.msg("peer %s already subscribed on topic %s" % (proto.peer, topicUri))
971
972
973   def onClientUnsubscribed(self, proto, topicUri):
974      """
975      Callback fired when peer was (successfully) unsubscribed from some topic.
976
977      :param proto: Peer protocol instance unsubscribed.
978      :type proto: Instance of WampServerProtocol.
979      :param topicUri: Fully qualified, resolved URI of topic unsubscribed.
980      :type topicUri: str
981      """
982      pass
983
984
985   def _unsubscribeClient(self, proto, topicUri = None):
986      """
987      Called from proto to unsubscribe client from topic.
988      """
989      if topicUri:
990         if self.subscriptions.has_key(topicUri) and proto in self.subscriptions[topicUri]:
991            self.subscriptions[topicUri].discard(proto)
992            if self.debugWamp:
993               log.msg("unsubscribed peer %s from topic %s" % (proto.peer, topicUri))
994            if len(self.subscriptions[topicUri]) == 0:
995               del self.subscriptions[topicUri]
996               if self.debugWamp:
997                  log.msg("topic %s removed from subscriptions map - no one subscribed anymore" % topicUri)
998            self.onClientUnsubscribed(proto, topicUri)
999         else:
1000            if self.debugWamp:
1001               log.msg("peer %s not subscribed on topic %s" % (proto.peer, topicUri))
1002      else:
1003         for topicUri, subscribers in self.subscriptions.items():
1004            if proto in subscribers:
1005               subscribers.discard(proto)
1006               if self.debugWamp:
1007                  log.msg("unsubscribed peer %s from topic %s" % (proto.peer, topicUri))
1008               if len(subscribers) == 0:
1009                  del self.subscriptions[topicUri]
1010                  if self.debugWamp:
1011                     log.msg("topic %s removed from subscriptions map - no one subscribed anymore" % topicUri)
1012               self.onClientUnsubscribed(proto, topicUri)
1013         if self.debugWamp:
1014            log.msg("unsubscribed peer %s from all topics" % (proto.peer))
1015
1016
1017   # noinspection PyDefaultArgument
1018   def dispatch(self, topicUri, event, exclude = [], eligible = None):
1019      """
1020      Dispatch an event to all peers subscribed to the event topic.
1021
1022      :param topicUri: Topic to publish event to.
1023      :type topicUri: str
1024      :param event: Event to publish (must be JSON serializable).
1025      :type event: obj
1026      :param exclude: List of WampServerProtocol instances to exclude from receivers.
1027      :type exclude: List of obj
1028      :param eligible: List of WampServerProtocol instances eligible as receivers (or None for all).
1029      :type eligible: List of obj
1030
1031      :returns twisted.internet.defer.Deferred -- Will be fired when event was
1032      dispatched to all subscribers. The return value provided to the deferred
1033      is a pair (delivered, requested), where delivered = number of actual
1034      receivers, and requested = number of (subscribers - excluded) & eligible.
1035      """
1036      if self.debugWamp:
1037         log.msg("publish event %s for topicUri %s" % (str(event), topicUri))
1038
1039      d = Deferred()
1040
1041      if self.subscriptions.has_key(topicUri) and len(self.subscriptions[topicUri]) > 0:
1042
1043         ## FIXME: this might break ordering of event delivery from a
1044         ## receiver perspective. We might need to have send queues
1045         ## per receiver OR do recvs = deque(sorted(..))
1046
1047         ## However, see http://twistedmatrix.com/trac/ticket/1396
1048
1049         if eligible is not None:
1050            subscrbs = set(eligible) & self.subscriptions[topicUri]
1051         else:
1052            subscrbs = self.subscriptions[topicUri]
1053
1054         if len(exclude) > 0:
1055            recvs = subscrbs - set(exclude)
1056         else:
1057            recvs = subscrbs
1058
1059         l = len(recvs)
1060         if l > 0:
1061
1062            ## ok, at least 1 subscriber not excluded and eligible
1063            ## => prepare message for mass sending
1064            ##
1065            o = [WampProtocol.MESSAGE_TYPEID_EVENT, topicUri, event]
1066            try:
1067               msg = self._serialize(o)
1068               if self.debugWamp:
1069                  log.msg("serialized event msg: " + str(msg))
1070            except Exception as e:
1071               raise Exception("invalid type for event - serialization failed [%s]" % e)
1072
1073            preparedMsg = self.prepareMessage(msg)
1074
1075            ## chunked sending of prepared message
1076            ##
1077            self._sendEvents(preparedMsg, recvs.copy(), 0, l, d)
1078
1079         else:
1080            ## receivers list empty after considering exlude and eligible sessions
1081            ##
1082            d.callback((0, 0))
1083      else:
1084         ## no one subscribed on topic
1085         ##
1086         d.callback((0, 0))
1087
1088      return d
1089
1090
1091   def _sendEvents(self, preparedMsg, recvs, delivered, requested, d):
1092      """
1093      Delivers events to receivers in chunks and reenters the reactor
1094      in-between, so that other stuff can run.
1095      """
1096      ## deliver a batch of events
1097      done = False
1098      for i in xrange(0, 256):
1099         try:
1100            proto = recvs.pop()
1101            if proto.state == WebSocketProtocol.STATE_OPEN:
1102               try:
1103                  proto.sendPreparedMessage(preparedMsg)
1104               except:
1105                  pass
1106               else:
1107                  if self.debugWamp:
1108                     log.msg("delivered event to peer %s" % proto.peer)
1109                  delivered += 1
1110         except KeyError:
1111            # all receivers done
1112            done = True
1113            break
1114
1115      if not done:
1116         ## if there are receivers left, redo
1117         self.reactor.callLater(0, self._sendEvents, preparedMsg, recvs, delivered, requested, d)
1118      else:
1119         ## else fire final result
1120         d.callback((delivered, requested))
1121
1122
1123   def _addSession(self, proto, session_id):
1124      """
1125      Add proto for session ID.
1126      """
1127      if not self.protoToSessions.has_key(proto):
1128         self.protoToSessions[proto] = session_id
1129      else:
1130         raise Exception("logic error - dublicate _addSession for protoToSessions")
1131      if not self.sessionsToProto.has_key(session_id):
1132         self.sessionsToProto[session_id] = proto
1133      else:
1134         raise Exception("logic error - dublicate _addSession for sessionsToProto")
1135
1136
1137   def _removeSession(self, proto):
1138      """
1139      Remove session by proto.
1140      """
1141      if self.protoToSessions.has_key(proto):
1142         session_id = self.protoToSessions[proto]
1143         del self.protoToSessions[proto]
1144         if self.sessionsToProto.has_key(session_id):
1145            del self.sessionsToProto[session_id]
1146
1147
1148   def sessionIdToProto(self, sessionId):
1149      """
1150      Map WAMP session ID to connected protocol instance (object of type WampServerProtocol).
1151
1152      :param sessionId: WAMP session ID to be mapped.
1153      :type sessionId: str
1154
1155      :returns obj -- WampServerProtocol instance or None.
1156      """
1157      return self.sessionsToProto.get(sessionId, None)
1158
1159
1160   def sessionIdsToProtos(self, sessionIds):
1161      """
1162      Map WAMP session IDs to connected protocol instances (objects of type WampServerProtocol).
1163
1164      :param sessionIds: List of session IDs to be mapped.
1165      :type sessionIds: list of str
1166
1167      :returns list -- List of WampServerProtocol instances corresponding to the WAMP session IDs.
1168      """
1169      protos = []
1170      for s in sessionIds:
1171         if self.sessionsToProto.has_key(s):
1172            protos.append(self.sessionsToProto[s])
1173      return protos
1174
1175
1176   def protoToSessionId(self, proto):
1177      """
1178      Map connected protocol instance (object of type WampServerProtocol) to WAMP session ID.
1179
1180      :param proto: Instance of WampServerProtocol to be mapped.
1181      :type proto: obj of WampServerProtocol
1182
1183      :returns str -- WAMP session ID or None.
1184      """
1185      return self.protoToSessions.get(proto, None)
1186
1187
1188   def protosToSessionIds(self, protos):
1189      """
1190      Map connected protocol instances (objects of type WampServerProtocol) to WAMP session IDs.
1191
1192      :param protos: List of instances of WampServerProtocol to be mapped.
1193      :type protos: list of WampServerProtocol
1194
1195      :returns list -- List of WAMP session IDs corresponding to the protos.
1196      """
1197      sessionIds = []
1198      for p in protos:
1199         if self.protoToSessions.has_key(p):
1200            sessionIds.append(self.protoToSessions[p])
1201      return sessionIds
1202
1203
1204   def startFactory(self):
1205      """
1206      Called by Twisted when the factory starts up. When overriding, make
1207      sure to call the base method.
1208      """
1209      if self.debugWamp:
1210         log.msg("WampServerFactory starting")
1211      self.subscriptions = {}
1212      self.protoToSessions = {}
1213      self.sessionsToProto = {}
1214
1215
1216   def stopFactory(self):
1217      """
1218      Called by Twisted when the factory shuts down. When overriding, make
1219      sure to call the base method.
1220      """
1221      if self.debugWamp:
1222         log.msg("WampServerFactory stopped")
1223
1224
1225
1226class WampClientProtocol(WebSocketClientProtocol, WampProtocol):
1227   """
1228   Twisted client protocol for WAMP.
1229   """
1230
1231   def onSessionOpen(self):
1232      """
1233      Callback fired when WAMP session was fully established. Override
1234      in derived class.
1235      """
1236      pass
1237
1238
1239   def onOpen(self):
1240      ## do nothing here .. onSessionOpen is only fired when welcome
1241      ## message was received (and thus session ID set)
1242      pass
1243
1244
1245   def onConnect(self, connectionResponse):
1246      if connectionResponse.protocol not in self.factory.protocols:
1247         raise Exception("server does not speak WAMP")
1248
1249
1250   def connectionMade(self):
1251      WebSocketClientProtocol.connectionMade(self)
1252      WampProtocol.connectionMade(self)
1253
1254      self.subscriptions = {}
1255
1256      self.handlerMapping = {
1257         self.MESSAGE_TYPEID_CALL: CallHandler(self, self.prefixes),
1258         self.MESSAGE_TYPEID_CALL_RESULT: CallResultHandler(self, self.prefixes),
1259         self.MESSAGE_TYPEID_CALL_ERROR: CallErrorHandler(self, self.prefixes)}
1260
1261
1262   def connectionLost(self, reason):
1263      WampProtocol.connectionLost(self, reason)
1264      WebSocketClientProtocol.connectionLost(self, reason)
1265
1266
1267   def sendMessage(self, payload):
1268      if self.debugWamp:
1269         log.msg("TX WAMP: %s" % str(payload))
1270      WebSocketClientProtocol.sendMessage(self, payload)
1271
1272
1273   def onMessage(self, msg, binary):
1274      """Internal method to handle WAMP messages received from WAMP server."""
1275
1276      ## WAMP is text message only
1277      ##
1278      if binary:
1279         self._protocolError("binary WebSocket message received")
1280         return
1281
1282      if self.debugWamp:
1283         log.msg("RX WAMP: %s" % str(msg))
1284
1285      ## WAMP is proper JSON payload
1286      ##
1287      try:
1288         obj = self.factory._unserialize(msg)
1289      except Exception as e:
1290         self._protocolError("WAMP message payload could not be unserialized [%s]" % e)
1291         return
1292
1293      ## Every WAMP message is a list
1294      ##
1295      if type(obj) != list:
1296         self._protocolError("WAMP message payload not a list")
1297         return
1298
1299      ## Every WAMP message starts with an integer for message type
1300      ##
1301      if len(obj) < 1:
1302         self._protocolError("WAMP message without message type")
1303         return
1304      if type(obj[0]) != int:
1305         self._protocolError("WAMP message type not an integer")
1306         return
1307
1308      ## WAMP message type
1309      ##
1310      msgtype = obj[0]
1311
1312      ## Valid WAMP message types received by WAMP clients
1313      ##
1314      if msgtype not in [WampProtocol.MESSAGE_TYPEID_WELCOME,
1315                         WampProtocol.MESSAGE_TYPEID_CALL,
1316                         WampProtocol.MESSAGE_TYPEID_CALL_RESULT,
1317                         WampProtocol.MESSAGE_TYPEID_CALL_ERROR,
1318                         WampProtocol.MESSAGE_TYPEID_EVENT]:
1319         self._protocolError("invalid WAMP message type %d" % msgtype)
1320         return
1321
1322      if msgtype in [WampProtocol.MESSAGE_TYPEID_CALL,
1323                     WampProtocol.MESSAGE_TYPEID_CALL_RESULT,
1324                     WampProtocol.MESSAGE_TYPEID_CALL_ERROR]:
1325         self.handlerMapping[msgtype].handleMessage(obj)
1326
1327      ## WAMP EVENT
1328      ##
1329      elif msgtype == WampProtocol.MESSAGE_TYPEID_EVENT:
1330         ## Topic
1331         ##
1332         if len(obj) != 3:
1333            self._protocolError("WAMP EVENT message invalid length %d" % len(obj))
1334            return
1335         if type(obj[1]) not in [unicode, str]:
1336            self._protocolError("invalid type for <topic> in WAMP EVENT message")
1337            return
1338         unresolvedTopicUri = str(obj[1])
1339         topicUri = self.prefixes.resolveOrPass(unresolvedTopicUri) ### PFX - remove
1340
1341         ## Fire PubSub Handler
1342         ##
1343         if self.subscriptions.has_key(topicUri):
1344            event = obj[2]
1345            # noinspection PyCallingNonCallable
1346            self.subscriptions[topicUri](topicUri, event)
1347         else:
1348            ## event received for non-subscribed topic (could be because we
1349            ## just unsubscribed, and server already sent out event for
1350            ## previous subscription)
1351            pass
1352
1353      ## WAMP WELCOME
1354      ##
1355      elif msgtype == WampProtocol.MESSAGE_TYPEID_WELCOME:
1356         ## Session ID
1357         ##
1358         if len(obj) < 2:
1359            self._protocolError("WAMP WELCOME message invalid length %d" % len(obj))
1360            return
1361         if type(obj[1]) not in [unicode, str]:
1362            self._protocolError("invalid type for <sessionid> in WAMP WELCOME message")
1363            return
1364         self.session_id = str(obj[1])
1365
1366         ## WAMP Protocol Version
1367         ##
1368         if len(obj) > 2:
1369            if type(obj[2]) not in [int]:
1370               self._protocolError("invalid type for <version> in WAMP WELCOME message")
1371               return
1372            else:
1373               self.session_protocol_version = obj[2]
1374         else:
1375            self.session_protocol_version = None
1376
1377         ## Server Ident
1378         ##
1379         if len(obj) > 3:
1380            if type(obj[3]) not in [unicode, str]:
1381               self._protocolError("invalid type for <server> in WAMP WELCOME message")
1382               return
1383            else:
1384               self.session_server = obj[3]
1385         else:
1386            self.session_server = None
1387
1388         self.onSessionOpen()
1389
1390      else:
1391         raise Exception("logic error")
1392
1393
1394   def prefix(self, prefix, uri):
1395      """
1396      Establishes a prefix to be used in `CURIEs <http://en.wikipedia.org/wiki/CURIE>`_
1397      instead of URIs having that prefix for both client-to-server and
1398      server-to-client messages.
1399
1400      :param prefix: Prefix to be used in CURIEs.
1401      :type prefix: str
1402      :param uri: URI that this prefix will resolve to.
1403      :type uri: str
1404      """
1405
1406      if type(prefix) != str:
1407         raise Exception("invalid type for prefix")
1408
1409      if type(uri) not in [unicode, str]:
1410         raise Exception("invalid type for URI")
1411
1412      if self.prefixes.get(prefix):  ### PFX - keep
1413         raise Exception("prefix already defined")
1414
1415      self.prefixes.set(prefix, uri) ### PFX - keep
1416
1417      msg = [WampProtocol.MESSAGE_TYPEID_PREFIX, prefix, uri]
1418
1419      self.sendMessage(self.factory._serialize(msg))
1420
1421
1422   def publish(self, topicUri, event, excludeMe = None, exclude = None, eligible = None):
1423      """
1424      Publish an event under a topic URI. The latter may be abbreviated using a
1425      CURIE which has been previously defined using prefix(). The event must
1426      be JSON serializable.
1427
1428      :param topicUri: The topic URI or CURIE.
1429      :type topicUri: str
1430      :param event: Event to be published (must be JSON serializable) or None.
1431      :type event: value
1432      :param excludeMe: When True, don't deliver the published event to myself (when I'm subscribed).
1433      :type excludeMe: bool
1434      :param exclude: Optional list of session IDs to exclude from receivers.
1435      :type exclude: list of str
1436      :param eligible: Optional list of session IDs to that are eligible as receivers.
1437      :type eligible: list of str
1438      """
1439
1440      if type(topicUri) not in [unicode, str]:
1441         raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
1442
1443      if excludeMe is not None:
1444         if type(excludeMe) != bool:
1445            raise Exception("invalid type for parameter 'excludeMe' - must be bool (was %s)" % type(excludeMe))
1446
1447      if exclude is not None:
1448         if type(exclude) != list:
1449            raise Exception("invalid type for parameter 'exclude' - must be list (was %s)" % type(exclude))
1450
1451      if eligible is not None:
1452         if type(eligible) != list:
1453            raise Exception("invalid type for parameter 'eligible' - must be list (was %s)" % type(eligible))
1454
1455      if exclude is not None or eligible is not None:
1456         if exclude is None:
1457            if excludeMe is not None:
1458               if excludeMe:
1459                  exclude = [self.session_id]
1460               else:
1461                  exclude = []
1462            else:
1463               exclude = [self.session_id]
1464         if eligible is not None:
1465            msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, exclude, eligible]
1466         else:
1467            msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, exclude]
1468      else:
1469         if excludeMe:
1470            msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event]
1471         else:
1472            msg = [WampProtocol.MESSAGE_TYPEID_PUBLISH, topicUri, event, excludeMe]
1473
1474      try:
1475         o = self.factory._serialize(msg)
1476      except:
1477         raise Exception("invalid type for parameter 'event' - not JSON serializable")
1478
1479      self.sendMessage(o)
1480
1481
1482   def subscribe(self, topicUri, handler):
1483      """
1484      Subscribe to topic. When already subscribed, will overwrite the handler.
1485
1486      :param topicUri: URI or CURIE of topic to subscribe to.
1487      :type topicUri: str
1488      :param handler: Event handler to be invoked upon receiving events for topic.
1489      :type handler: Python callable, will be called as in <callable>(eventUri, event).
1490      """
1491      if type(topicUri) not in [unicode, str]:
1492         raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
1493
1494      if not hasattr(handler, '__call__'):
1495         raise Exception("invalid type for parameter 'handler' - must be a callable (was %s)" % type(handler))
1496
1497      turi = self.prefixes.resolveOrPass(topicUri) ### PFX - keep
1498      if not self.subscriptions.has_key(turi):
1499         msg = [WampProtocol.MESSAGE_TYPEID_SUBSCRIBE, topicUri]
1500         o = self.factory._serialize(msg)
1501         self.sendMessage(o)
1502      self.subscriptions[turi] = handler
1503
1504
1505   def unsubscribe(self, topicUri):
1506      """
1507      Unsubscribe from topic. Will do nothing when currently not subscribed to the topic.
1508
1509      :param topicUri: URI or CURIE of topic to unsubscribe from.
1510      :type topicUri: str
1511      """
1512      if type(topicUri) not in [unicode, str]:
1513         raise Exception("invalid type for parameter 'topicUri' - must be string (was %s)" % type(topicUri))
1514
1515      turi = self.prefixes.resolveOrPass(topicUri) ### PFX - keep
1516      if self.subscriptions.has_key(turi):
1517         msg = [WampProtocol.MESSAGE_TYPEID_UNSUBSCRIBE, topicUri]
1518         o = self.factory._serialize(msg)
1519         self.sendMessage(o)
1520         del self.subscriptions[turi]
1521
1522
1523
1524class WampClientFactory(WebSocketClientFactory, WampFactory):
1525   """
1526   Twisted client factory for WAMP.
1527   """
1528
1529   protocol = WampClientProtocol
1530
1531   def __init__(self,
1532                url,
1533                debug = False,
1534                debugCodePaths = False,
1535                debugWamp = False,
1536                debugApp = False,
1537                reactor = None):
1538      self.debugWamp = debugWamp
1539      self.debugApp = debugApp
1540      WebSocketClientFactory.__init__(self,
1541                                      url,
1542                                      protocols = ["wamp"],
1543                                      debug = debug,
1544                                      debugCodePaths = debugCodePaths,
1545                                      reactor = reactor)
1546      WampFactory.__init__(self)
1547
1548
1549   def startFactory(self):
1550      """
1551      Called by Twisted when the factory starts up. When overriding, make
1552      sure to call the base method.
1553      """
1554      if self.debugWamp:
1555         log.msg("WebSocketClientFactory starting")
1556
1557
1558   def stopFactory(self):
1559      """
1560      Called by Twisted when the factory shuts down. When overriding, make
1561      sure to call the base method.
1562      """
1563      if self.debugWamp:
1564         log.msg("WebSocketClientFactory stopped")
1565
1566
1567
1568class WampCraProtocol(WampProtocol):
1569   """
1570   Base class for WAMP Challenge-Response Authentication protocols (client and server).
1571
1572   WAMP-CRA is a cryptographically strong challenge response authentication
1573   protocol based on HMAC-SHA256.
1574
1575   The protocol performs in-band authentication of WAMP clients to WAMP servers.
1576
1577   WAMP-CRA does not introduce any new WAMP protocol level message types, but
1578   implements the authentication handshake via standard WAMP RPCs with well-known
1579   procedure URIs and signatures.
1580   """
1581
1582   def deriveKey(secret, extra = None):
1583      """
1584      Computes a derived cryptographic key from a password according to PBKDF2
1585      http://en.wikipedia.org/wiki/PBKDF2.
1586
1587      The function will only return a derived key if at least 'salt' is
1588      present in the 'extra' dictionary. The complete set of attributes
1589      that can be set in 'extra':
1590
1591         salt: The salt value to be used.
1592         iterations: Number of iterations of derivation algorithm to run.
1593         keylen: Key length to derive.
1594
1595      :returns str -- The derived key or the original secret.
1596      """
1597      if type(extra) == dict and extra.has_key('salt'):
1598         salt = str(extra['salt'])
1599         iterations = int(extra.get('iterations', 10000))
1600         keylen = int(extra.get('keylen', 32))
1601         b = pbkdf2_bin(secret, salt, iterations, keylen, hashlib.sha256)
1602         return binascii.b2a_base64(b).strip()
1603      else:
1604         return secret
1605
1606   deriveKey = staticmethod(deriveKey)
1607
1608
1609   def authSignature(self, authChallenge, authSecret = None, authExtra = None):
1610      """
1611      Compute the authentication signature from an authentication challenge and a secret.
1612
1613      :param authChallenge: The authentication challenge.
1614      :type authChallenge: str
1615      :param authSecret: The authentication secret.
1616      :type authSecret: str
1617      :authExtra: Extra authentication information for salting the secret. (salt, keylen,
1618              iterations)
1619      :type authExtra: dict
1620
1621      :returns str -- The authentication signature.
1622      """
1623      if authSecret is None:
1624         authSecret = ""
1625      if isinstance(authSecret, unicode):
1626         authSecret = authSecret.encode('utf8')
1627      authSecret = WampCraProtocol.deriveKey(authSecret, authExtra)
1628      h = hmac.new(authSecret, authChallenge, hashlib.sha256)
1629      sig = binascii.b2a_base64(h.digest()).strip()
1630      return sig
1631
1632
1633
1634class WampCraClientProtocol(WampClientProtocol, WampCraProtocol):
1635   """
1636   Simple, authenticated WAMP client protocol.
1637
1638   The client can perform WAMP-Challenge-Response-Authentication ("WAMP-CRA") to authenticate
1639   itself to a WAMP server. The server needs to implement WAMP-CRA also of course.
1640   """
1641
1642   def authenticate(self, authKey = None, authExtra = None, authSecret = None):
1643      """
1644      Authenticate the WAMP session to server.
1645
1646      :param authKey: The key of the authentication credentials, something like a user or application name.
1647      :type authKey: str
1648      :param authExtra: Any extra authentication information.
1649      :type authExtra: dict
1650      :param authSecret: The secret of the authentication credentials, something like the user password or application secret key.
1651      :type authsecret: str
1652
1653      :returns Deferred -- Deferred that fires upon authentication success (with permissions) or failure.
1654      """
1655
1656      def _onAuthChallenge(challenge):
1657         if authKey is not None:
1658            challengeObj =  self.factory._unserialize(challenge)
1659            if 'authextra' in challengeObj:
1660                authExtra = challengeObj['authextra']
1661                sig = self.authSignature(challenge, authSecret, authExtra)
1662            else:
1663                sig = self.authSignature(challenge, authSecret)
1664         else:
1665            sig = None
1666         d = self.call(WampProtocol.URI_WAMP_PROCEDURE + "auth", sig)
1667         return d
1668
1669      d = self.call(WampProtocol.URI_WAMP_PROCEDURE + "authreq", authKey, authExtra)
1670      d.addCallback(_onAuthChallenge)
1671      return d
1672
1673
1674
1675class WampCraServerProtocol(WampServerProtocol, WampCraProtocol):
1676   """
1677   Simple, authenticating WAMP server protocol.
1678
1679   The server lets clients perform WAMP-Challenge-Response-Authentication ("WAMP-CRA")
1680   to authenticate. The clients need to implement WAMP-CRA also of course.
1681
1682   To implement an authenticating server, override:
1683
1684      * getAuthSecret
1685      * getAuthPermissions
1686      * onAuthenticated
1687
1688   in your class deriving from this class.
1689   """
1690
1691   clientAuthTimeout = 0
1692   """
1693   Client authentication timeout in seconds or 0 for infinite. A client
1694   must perform authentication after the initial WebSocket handshake within
1695   this timeout or the connection is failed.
1696   """
1697
1698   clientAuthAllowAnonymous = True
1699   """
1700   Allow anonymous client authentication. When this is set to True, a client
1701   may "authenticate" as anonymous.
1702   """
1703
1704
1705   def getAuthPermissions(self, authKey, authExtra):
1706      """
1707      Get the permissions the session is granted when the authentication succeeds
1708      for the given key / extra information.
1709
1710      Override in derived class to implement your authentication.
1711
1712      A permissions object is structured like this::
1713
1714         {'permissions': {'rpc': [
1715                                    {'uri':  / RPC Endpoint URI - String /,
1716                                     'call': / Allow to call? - Boolean /}
1717                                 ],
1718                          'pubsub': [
1719                                       {'uri':    / PubSub Topic URI / URI prefix - String /,
1720                                        'prefix': / URI matched by prefix? - Boolean /,
1721                                        'pub':    / Allow to publish? - Boolean /,
1722                                        'sub':    / Allow to subscribe? - Boolean /}
1723                                    ]
1724                          }
1725         }
1726
1727      You can add custom information to this object. The object will be provided again
1728      when the client authentication succeeded in :meth:`onAuthenticated`.
1729
1730      :param authKey: The authentication key.
1731      :type authKey: str
1732      :param authExtra: Authentication extra information.
1733      :type authExtra: dict
1734
1735      :returns obj or Deferred -- Return a permissions object or None when no permissions granted.
1736      """
1737      return None
1738
1739
1740   def getAuthSecret(self, authKey):
1741      """
1742      Get the authentication secret for an authentication key, i.e. the
1743      user password for the user name. Return None when the authentication
1744      key does not exist.
1745
1746      Override in derived class to implement your authentication.
1747
1748      :param authKey: The authentication key.
1749      :type authKey: str
1750
1751      :returns str or Deferred -- The authentication secret for the key or None when the key does not exist.
1752      """
1753      return None
1754
1755
1756   def onAuthTimeout(self):
1757      """
1758      Fired when the client does not authenticate itself in time. The default implementation
1759      will simply fail the connection.
1760
1761      May be overridden in derived class.
1762      """
1763      if not self._clientAuthenticated:
1764         log.msg("failing connection upon client authentication timeout [%s secs]" % self.clientAuthTimeout)
1765         self.failConnection()
1766
1767
1768   def onAuthenticated(self, authKey, permissions):
1769      """
1770      Fired when client authentication was successful.
1771
1772      Override in derived class and register PubSub topics and/or RPC endpoints.
1773
1774      :param authKey: The authentication key the session was authenticated for.
1775      :type authKey: str
1776      :param permissions: The permissions object returned from :meth:`getAuthPermissions`.
1777      :type permissions: obj
1778      """
1779      pass
1780
1781
1782   def registerForPubSubFromPermissions(self, permissions):
1783      """
1784      Register topics for PubSub from auth permissions.
1785
1786      :param permissions: The permissions granted to the now authenticated client.
1787      :type permissions: list
1788      """
1789      for p in permissions['pubsub']:
1790         ## register topics for the clients
1791         ##
1792         pubsub = (WampServerProtocol.PUBLISH if p['pub'] else 0) | \
1793                  (WampServerProtocol.SUBSCRIBE if p['sub'] else 0)
1794         topic = p['uri']
1795         if self.pubHandlers.has_key(topic) or self.subHandlers.has_key(topic):
1796            ## FIXME: handle dups!
1797            log.msg("DUPLICATE TOPIC PERMISSION !!! " + topic)
1798         self.registerForPubSub(topic, p['prefix'], pubsub)
1799
1800
1801   def onSessionOpen(self):
1802      """
1803      Called when WAMP session has been established, but not yet authenticated. The default
1804      implementation will prepare the session allowing the client to authenticate itself.
1805      """
1806
1807      ## register RPC endpoints for WAMP-CRA authentication
1808      ##
1809      self.registerForRpc(self, WampProtocol.URI_WAMP_PROCEDURE, [WampCraServerProtocol.authRequest,
1810                                                                  WampCraServerProtocol.auth])
1811
1812      ## reset authentication state
1813      ##
1814      self._clientAuthenticated = False
1815      self._clientPendingAuth = None
1816      self._clientAuthTimeoutCall = None
1817
1818      ## client authentication timeout
1819      ##
1820      if self.clientAuthTimeout > 0:
1821         self._clientAuthTimeoutCall = self.factory.reactor.callLater(self.clientAuthTimeout, self.onAuthTimeout)
1822
1823
1824   @exportRpc("authreq")
1825   def authRequest(self, authKey = None, extra = None):
1826      """
1827      RPC endpoint for clients to initiate the authentication handshake.
1828
1829      :param authKey: Authentication key, such as user name or application name.
1830      :type authKey: str
1831      :param extra: Authentication extra information.
1832      :type extra: dict
1833
1834      :returns str -- Authentication challenge. The client will need to create an authentication signature from this.
1835      """
1836
1837      ## check authentication state
1838      ##
1839      if self._clientAuthenticated:
1840         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "already-authenticated"), "already authenticated")
1841      if self._clientPendingAuth is not None:
1842         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "authentication-already-requested"), "authentication request already issues - authentication pending")
1843
1844      ## check extra
1845      ##
1846      if extra:
1847         if type(extra) != dict:
1848            raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "extra not a dictionary (was %s)." % str(type(extra)))
1849      else:
1850         extra = {}
1851      #for k in extra:
1852      #   if type(extra[k]) not in [str, unicode, int, long, float, bool, types.NoneType]:
1853      #      raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "attribute '%s' in extra not a primitive type (was %s)" % (k, str(type(extra[k]))))
1854
1855      ## check authKey
1856      ##
1857      if authKey is None and not self.clientAuthAllowAnonymous:
1858         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "anonymous-auth-forbidden"), "authentication as anonymous forbidden")
1859
1860      if type(authKey) not in [str, unicode, types.NoneType]:
1861         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "authentication key must be a string (was %s)" % str(type(authKey)))
1862
1863      d = maybeDeferred(self.getAuthSecret, authKey)
1864
1865      def onGetAuthSecretOk(authSecret, authKey, extra):
1866         if authKey is not None and authSecret is None:
1867            raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "no-such-authkey"), "authentication key '%s' does not exist." % authKey)
1868
1869         ## each authentication request gets a unique authid, which can only be used (later) once!
1870         ##
1871         authid = newid()
1872
1873         ## create authentication challenge
1874         ##
1875         info = {'authid': authid, 'authkey': authKey, 'timestamp': utcnow(), 'sessionid': self.session_id,
1876                 'extra': extra}
1877
1878         pp = maybeDeferred(self.getAuthPermissions, authKey, extra)
1879
1880         def onAuthPermissionsOk(res):
1881            if res is None:
1882               res = {'permissions': {'pubsub': [], 'rpc': []}}
1883            info['permissions'] = res['permissions']
1884            if 'authextra' in res:
1885                info['authextra'] = res['authextra']
1886
1887            if authKey:
1888               ## authenticated session
1889               ##
1890               infoser = self.factory._serialize(info)
1891               sig = self.authSignature(infoser, authSecret)
1892
1893               self._clientPendingAuth = (info, sig, res)
1894               return infoser
1895            else:
1896               ## anonymous session
1897               ##
1898               self._clientPendingAuth = (info, None, res)
1899               return None
1900
1901         def onAuthPermissionsError(e):
1902            raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "auth-permissions-error"), str(e))
1903
1904         pp.addCallbacks(onAuthPermissionsOk, onAuthPermissionsError)
1905
1906         return pp
1907
1908      d.addCallback(onGetAuthSecretOk, authKey, extra)
1909      return d
1910
1911
1912   @exportRpc("auth")
1913   def auth(self, signature = None):
1914      """
1915      RPC endpoint for clients to actually authenticate after requesting authentication and computing
1916      a signature from the authentication challenge.
1917
1918      :param signature: Authentication signature computed by the client.
1919      :type signature: str
1920
1921      :returns list -- A list of permissions the client is granted when authentication was successful.
1922      """
1923
1924      ## check authentication state
1925      ##
1926      if self._clientAuthenticated:
1927         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "already-authenticated"), "already authenticated")
1928      if self._clientPendingAuth is None:
1929         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "no-authentication-requested"), "no authentication previously requested")
1930
1931      ## check signature
1932      ##
1933      if type(signature) not in [str, unicode, types.NoneType]:
1934         raise Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-argument"), "signature must be a string or None (was %s)" % str(type(signature)))
1935      if self._clientPendingAuth[1] != signature:
1936         ## delete pending authentication, so that no retries are possible. authid is only valid for 1 try!!
1937         ##
1938         self._clientPendingAuth = None
1939
1940         ## notify the client of failed authentication, but only after a random,
1941         ## exponentially distributed delay. this (further) protects against
1942         ## timing attacks
1943         ##
1944         d = Deferred()
1945         def fail():
1946            ## FIXME: (optionally) drop the connection instead of returning RPC error?
1947            ##
1948            d.errback(Exception(self.shrink(WampProtocol.URI_WAMP_ERROR + "invalid-signature"), "signature for authentication request is invalid"))
1949         failDelaySecs = random.expovariate(1.0 / 0.8) # mean = 0.8 secs
1950         self.factory.reactor.callLater(failDelaySecs, fail)
1951         return d
1952
1953      ## at this point, the client has successfully authenticated!
1954
1955      ## get the permissions we determined earlier
1956      ##
1957      perms = self._clientPendingAuth[2]
1958
1959      ## delete auth request and mark client as authenticated
1960      ##
1961      authKey = self._clientPendingAuth[0]['authkey']
1962      self._clientAuthenticated = True
1963      self._clientPendingAuth = None
1964      if self._clientAuthTimeoutCall is not None:
1965         self._clientAuthTimeoutCall.cancel()
1966         self._clientAuthTimeoutCall = None
1967
1968      ## fire authentication callback
1969      ##
1970      self.onAuthenticated(authKey, perms)
1971
1972      ## return permissions to client
1973      ##
1974      return perms['permissions']
1975
1976
1977
1978class Call:
1979   """
1980   Thin-wrapper for incoming RPCs provided to call handlers registered via
1981
1982     - registerHandlerMethodForRpc
1983     - registerHandlerProcedureForRpc
1984   """
1985
1986
1987   def __init__(self,
1988             proto,
1989             callid,
1990             uri,
1991             args,
1992             extra = None):
1993      self.proto = proto
1994      self.callid = callid
1995      self.uri = uri
1996      self.args = args
1997      self.extra = extra
1998      if self.proto.trackTimings:
1999          self.timings = Tracker(tracker=None, tracked=None)
2000      else:
2001          self.timings = None
2002
2003   def track(self, key):
2004       if self.timings:
2005           self.timings.track(key)
2006
2007
2008
2009class Handler(object):
2010   """
2011   A handler for a certain class of messages.
2012   """
2013
2014
2015   typeid = None
2016   tracker = None
2017
2018
2019   def __init__(self, proto, prefixes):
2020      """
2021      Remember protocol and prefix map in instance variables.
2022      """
2023      self.proto = proto
2024      self.prefixes = prefixes
2025
2026
2027   def handleMessage(self, msg_parts):
2028      """
2029      Template method for handling a message.
2030
2031      Check if the correct handler for the message type was
2032      called. Afterwards, assign all relevant parts of the message to
2033      instance variables and call the (overridden) method
2034      _handleMessage to actually handle the message.
2035      """
2036      msgtype = msg_parts[0]
2037      if self.typeid:
2038         assert msgtype == self.typeid, \
2039             "Message type %s does not match type id %s" % (msgtype,
2040                                                            self.typeid)
2041      else:
2042         assert False, \
2043             "No typeid defined for %s" % self.__class__.__name__
2044
2045      if self._messageIsValid(msg_parts):
2046         self._parseMessageParts(msg_parts)
2047         self._handleMessage()
2048
2049
2050   def _parseMessageParts(self, msg_parts):
2051      """
2052      Assign the message parts to instance variables.
2053      Has to be overridden in subclasses.
2054      """
2055      raise NotImplementedError
2056
2057   def _messageIsValid(self, msg_parts):
2058      """
2059      Check if the message parts have expected properties (type, etc.).
2060      Has to be overridden in subclasses.
2061      """
2062      raise NotImplementedError
2063
2064
2065   def _handleMessage(self):
2066      """
2067      Handle a specific kind of message.
2068      Has to be overridden in subclasses.
2069      """
2070      raise NotImplementedError
2071
2072
2073
2074class CallHandler(Handler):
2075   """
2076   A handler for incoming RPC calls.
2077   """
2078
2079   typeid = WampProtocol.MESSAGE_TYPEID_CALL
2080
2081
2082   def _messageIsValid(self, msg_parts):
2083      callid, uri = msg_parts[1:3]
2084      if not isinstance(callid, (str, unicode)):
2085         self.proto._protocolError(
2086            ("WAMP CALL message with invalid type %s for "
2087            "<callid>") % type(callid))
2088         return False
2089
2090      if not isinstance(uri, (str, unicode)):
2091         self.proto._protocolError(
2092            ("WAMP CALL message with invalid type %s for "
2093            "<uri>") % type(uri))
2094         return False
2095
2096      return True
2097
2098
2099   def _parseMessageParts(self, msg_parts):
2100      """
2101      Parse message and create call object.
2102      """
2103      self.callid = msg_parts[1]
2104      self.uri = self.prefixes.resolveOrPass(msg_parts[2]) ### PFX - remove
2105      self.args = msg_parts[3:]
2106
2107
2108   def _handleMessage(self):
2109      """
2110      Perform the RPC call and attach callbacks to its deferred object.
2111      """
2112      call = self._onBeforeCall()
2113
2114      ## execute incoming RPC
2115      d = maybeDeferred(self._callProcedure, call)
2116
2117      ## register callback and errback with extra argument call
2118      d.addCallbacks(self._onAfterCallSuccess,
2119                     self._onAfterCallError,
2120                     callbackArgs = (call,),
2121                     errbackArgs = (call,))
2122
2123
2124   def _onBeforeCall(self):
2125      """
2126      Create call object to move around call data
2127      """
2128      uri, args = self.proto.onBeforeCall(self.callid, self.uri, self.args, bool(self.proto.procForUri(self.uri)))
2129
2130      call = Call(self.proto, self.callid, uri, args)
2131      call.track("onBeforeCall")
2132      return call
2133
2134
2135   def _callProcedure(self, call):
2136      """
2137      Actually performs the call of a procedure invoked via RPC.
2138      """
2139      m = self.proto.procForUri(call.uri)
2140      if m is None:
2141         raise Exception(WampProtocol.URI_WAMP_ERROR_NO_SUCH_RPC_ENDPOINT, "No RPC endpoint registered for %s." % call.uri)
2142
2143      obj, method_or_proc, is_handler = m[:3]
2144      if not is_handler:
2145         return self._performProcedureCall(call, obj, method_or_proc)
2146      else:
2147         call.extra = m[3]
2148         return self._delegateToRpcHandler(call, obj, method_or_proc)
2149
2150
2151   def _performProcedureCall(self, call, obj, method_or_proc):
2152      """
2153      Perform a RPC method / procedure call.
2154      """
2155      cargs = tuple(call.args) if call.args else ()
2156      if obj:
2157         ## call object method
2158         return method_or_proc(obj, *cargs)
2159      else:
2160         ## call free-standing function/procedure
2161         return method_or_proc(*cargs)
2162
2163
2164   def _delegateToRpcHandler(self, call, obj, method_or_proc):
2165      """
2166      Delegate call to RPC handler.
2167      """
2168      if obj:
2169         ## call RPC handler on object
2170         return method_or_proc(obj, call)
2171      else:
2172         ## call free-standing RPC handler
2173         return method_or_proc(call)
2174
2175
2176   def _onAfterCallSuccess(self, result, call):
2177      """
2178      Execute custom success handler and send call result.
2179      """
2180      ## track timing and fire user callback
2181      call.track("onAfterCallSuccess")
2182      call.result = self.proto.onAfterCallSuccess(result, call)
2183
2184      ## send out WAMP message
2185      self._sendCallResult(call)
2186
2187
2188   def _onAfterCallError(self, error, call):
2189      """
2190      Execute custom error handler and send call error.
2191      """
2192      ## track timing and fire user callback
2193      call.track("onAfterCallError")
2194      call.error = self.proto.onAfterCallError(error, call)
2195
2196      ## send out WAMP message
2197      self._sendCallError(call)
2198
2199
2200   def _sendCallResult(self, call):
2201      """
2202      Marshal and send a RPC success result.
2203      """
2204      msg = [WampProtocol.MESSAGE_TYPEID_CALL_RESULT, call.callid, call.result]
2205      try:
2206         rmsg = self.proto.serializeMessage(msg)
2207      except:
2208         raise Exception("call result not JSON serializable")
2209      else:
2210         ## now actually send WAMP message
2211         self.proto.sendMessage(rmsg)
2212
2213         ## track timing and fire user callback
2214         call.track("onAfterSendCallSuccess")
2215         self.proto.onAfterSendCallSuccess(rmsg, call)
2216
2217
2218   def _sendCallError(self, call):
2219      """
2220      Marshal and send a RPC error result.
2221      """
2222      killsession = False
2223      rmsg = None
2224      try:
2225         error_info, killsession = self._extractErrorInfo(call)
2226         rmsg = self._assembleErrorMessage(call, *error_info)
2227      except Exception as e:
2228         rmsg = self._handleProcessingError(call, e)
2229      finally:
2230         if rmsg:
2231            ## now actually send WAMP message
2232            self.proto.sendMessage(rmsg)
2233
2234            ## track timing and fire user callback
2235            call.track("onAfterSendCallError")
2236            self.proto.onAfterSendCallError(rmsg, call)
2237
2238            if killsession:
2239               self.proto.sendClose(3000, "killing WAMP session upon request by application exception")
2240         else:
2241            raise Exception("fatal: internal error in CallHandler._sendCallError")
2242
2243
2244   def _extractErrorInfo(self, call):
2245      """
2246      Extract error information from the call.
2247      """
2248      ## get error args and len
2249      ##
2250      eargs = call.error.value.args
2251      nargs = len(eargs)
2252
2253      if nargs > 4:
2254         raise Exception("invalid args length %d for exception" % nargs)
2255
2256      ## erroruri & errordesc
2257      ##
2258      if nargs == 0:
2259         erroruri = WampProtocol.URI_WAMP_ERROR_GENERIC
2260         errordesc = WampProtocol.DESC_WAMP_ERROR_GENERIC
2261      elif nargs == 1:
2262         erroruri = WampProtocol.URI_WAMP_ERROR_GENERIC
2263         errordesc = eargs[0]
2264      else:
2265         erroruri = eargs[0]
2266         errordesc = eargs[1]
2267
2268      ## errordetails
2269      ##
2270      errordetails = None
2271      if nargs >= 3:
2272         errordetails = eargs[2]
2273      elif self.proto.includeTraceback:
2274         try:
2275            ## we'd like to do ..
2276            #tb = call.error.getTraceback()
2277
2278            ## .. but the implementation in Twisted
2279            ## http://twistedmatrix.com/trac/browser/tags/releases/twisted-13.1.0/twisted/python/failure.py#L529
2280            ## uses cStringIO which cannot handle Unicode string in tracebacks. Hence we do our own:
2281            io = StringIO.StringIO()
2282            call.error.printTraceback(file = io)
2283            tb = io.getvalue()
2284
2285         except Exception as ie:
2286            print("INTERNAL ERROR [_extractErrorInfo / getTraceback()]: %s" % ie)
2287            traceback.print_stack()
2288         else:
2289            errordetails = tb.splitlines()
2290
2291      ## killsession
2292      ##
2293      killsession = False
2294      if nargs >= 4:
2295         killsession = eargs[3]
2296
2297      ## recheck all error component types
2298      ##
2299      if type(erroruri) not in [str, unicode]:
2300         raise Exception("invalid type %s for errorUri" % type(erroruri))
2301
2302      if type(errordesc) not in [str, unicode]:
2303         raise Exception("invalid type %s for errorDesc" % type(errordesc))
2304
2305      ## errordetails must be JSON serializable. If not, we get exception later in sendMessage.
2306      ## We don't check here, since the only way would be to serialize to JSON and
2307      ## then we'd serialize twice (here and in sendMessage)
2308
2309      if type(killsession) not in [bool, types.NoneType]:
2310         raise Exception("invalid type %s for killSession" % type(killsession))
2311
2312      return (erroruri, errordesc, errordetails), killsession
2313
2314
2315   def _assembleErrorMessage(self, call, erroruri, errordesc, errordetails):
2316      """
2317      Assemble a WAMP RPC error message.
2318      """
2319      if errordetails is not None:
2320         msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR,
2321                call.callid,
2322                self.prefixes.shrink(erroruri), ### PFX - remove
2323                errordesc,
2324                errordetails]
2325      else:
2326         msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR,
2327                call.callid,
2328                self.prefixes.shrink(erroruri), ### PFX - remove
2329                errordesc]
2330
2331      ## serialize message. this can fail if errorDetails is not
2332      ## serializable
2333      try:
2334         rmsg = self.proto.serializeMessage(msg)
2335      except Exception as e:
2336         raise Exception(
2337            "invalid object for errorDetails - not serializable (%s)" %
2338            str(e))
2339
2340      return rmsg
2341
2342
2343   def _handleProcessingError(self, call, e):
2344      """
2345      Create a message describing what went wrong during processing an
2346      exception.
2347      """
2348      msg = [WampProtocol.MESSAGE_TYPEID_CALL_ERROR,
2349             call.callid,
2350              ### PFX - remove
2351             self.prefixes.shrink(WampProtocol.URI_WAMP_ERROR_INTERNAL),
2352             str(e)]
2353
2354      if self.proto.includeTraceback:
2355         try:
2356            tb = call.error.getTraceback()
2357         except Exception as ie:
2358            ## FIXME: find out why this can fail with
2359            ## "'unicode' does not have the buffer interface"
2360            print("INTERNAL ERROR (getTraceback): %s" % ie)
2361         else:
2362            msg.append(tb.splitlines())
2363
2364      result = self.proto.serializeMessage(msg)
2365      return result
2366
2367
2368
2369
2370class CallResultHandler(Handler):
2371   """
2372   A handler for to RPC call results.
2373   """
2374
2375   typeid = WampProtocol.MESSAGE_TYPEID_CALL_RESULT
2376
2377
2378   def _messageIsValid(self, msg_parts):
2379      if len(msg_parts) < 2:
2380         self.proto._protocolError(
2381            "WAMP CALL_RESULT message without <callid>")
2382         return False
2383      if len(msg_parts) != 3:
2384         self.proto._protocolError(
2385            "WAMP CALL_RESULT message with invalid length %d" % len(msg_parts))
2386         return False
2387
2388      if type(msg_parts[1]) not in [unicode, str]:
2389         self.proto._protocolError(
2390            ("WAMP CALL_RESULT message with invalid type %s for "
2391            "<callid>") % type(msg_parts[1]))
2392         return False
2393
2394      return True
2395
2396
2397   def _parseMessageParts(self, msg_parts):
2398      """
2399      Extract call result from message parts.
2400      """
2401      self.callid = str(msg_parts[1])
2402      self.result = msg_parts[2]
2403
2404
2405   def _handleMessage(self):
2406      ## Pop and process Call Deferred
2407      ##
2408      d = self.proto.calls.pop(self.callid, None)
2409      if d:
2410         ## WAMP CALL_RESULT
2411         ##
2412         d.callback(self.result)
2413      else:
2414         if self.proto.debugWamp:
2415            log.msg("callid not found for received call result message")
2416
2417
2418
2419class CallErrorHandler(Handler):
2420
2421   typeid = WampProtocol.MESSAGE_TYPEID_CALL_ERROR
2422
2423
2424   def _messageIsValid(self, msg_parts):
2425      if len(msg_parts) not in [4, 5]:
2426         self.proto._protocolError(
2427            "call error message invalid length %d" % len(msg_parts))
2428         return False
2429
2430      ## Error URI
2431      ##
2432      if type(msg_parts[2]) not in [unicode, str]:
2433         self.proto._protocolError(
2434            "invalid type %s for errorUri in call error message" %
2435            str(type(msg_parts[2])))
2436         return False
2437
2438      ## Error Description
2439      ##
2440      if type(msg_parts[3]) not in [unicode, str]:
2441         self.proto._protocolError(
2442            "invalid type %s for errorDesc in call error message" %
2443            str(type(msg_parts[3])))
2444         return False
2445
2446      return True
2447
2448
2449   def _parseMessageParts(self, msg_parts):
2450      """
2451      Extract error information from message parts.
2452      """
2453      self.callid = str(msg_parts[1])
2454      self.erroruri = str(msg_parts[2])
2455      self.errordesc = str(msg_parts[3])
2456
2457      ## Error Details
2458      ##
2459      if len(msg_parts) > 4:
2460         self.errordetails = msg_parts[4]
2461      else:
2462         self.errordetails = None
2463
2464
2465   def _handleMessage(self):
2466      """
2467      Fire Call Error Deferred.
2468      """
2469      ##
2470      ## Pop and process Call Deferred
2471      d = self.proto.calls.pop(self.callid, None)
2472      if d:
2473         e = Exception(self.erroruri, self.errordesc, self.errordetails)
2474         d.errback(e)
2475      else:
2476         if self.proto.debugWamp:
2477            log.msg("callid not found for received call error message")
2478