1############################################################################### 2## 3## Copyright (C) 2011-2014 Tavendo GmbH 4## 5## Licensed under the Apache License, Version 2.0 (the "License"); 6## you may not use this file except in compliance with the License. 7## You may obtain a copy of the License at 8## 9## http://www.apache.org/licenses/LICENSE-2.0 10## 11## Unless required by applicable law or agreed to in writing, software 12## distributed under the License is distributed on an "AS IS" BASIS, 13## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14## See the License for the specific language governing permissions and 15## limitations under the License. 16## 17############################################################################### 18 19from __future__ import absolute_import 20 21__all__ = ["createWsUrl", 22 "parseWsUrl", 23 24 "ConnectionRequest", 25 "ConnectionResponse", 26 "Timings", 27 28 "WebSocketProtocol", 29 "WebSocketFactory", 30 "WebSocketServerProtocol", 31 "WebSocketServerFactory", 32 "WebSocketClientProtocol", 33 "WebSocketClientFactory"] 34 35import binascii 36import hashlib 37import base64 38import struct 39import random 40import os 41import pickle 42import copy 43import json 44import six 45 46from pprint import pformat 47from collections import deque 48 49from autobahn import __version__ 50 51from autobahn.websocket.interfaces import IWebSocketChannel, \ 52 IWebSocketChannelFrameApi, \ 53 IWebSocketChannelStreamingApi 54 55from autobahn.util import Stopwatch 56from autobahn.websocket.utf8validator import Utf8Validator 57from autobahn.websocket.xormasker import XorMaskerNull, createXorMasker 58from autobahn.websocket.compress import * 59from autobahn.websocket import http 60 61from six.moves import urllib 62 63## The Python urlparse module currently does not contain the ws/wss 64## schemes, so we add those dynamically (which is a hack of course). 65## Since the urllib from six.moves does not seem to expose the stuff 66## we monkey patch here, we do it manually. 67## 68## Important: if you change this stuff (you shouldn't), make sure 69## _all_ our unit tests for WS URLs succeed 70## 71if not six.PY3: 72 ## Python 2 73 import urlparse 74else: 75 ## Python 3 76 from urllib import parse as urlparse 77 78wsschemes = ["ws", "wss"] 79urlparse.uses_relative.extend(wsschemes) 80urlparse.uses_netloc.extend(wsschemes) 81urlparse.uses_params.extend(wsschemes) 82urlparse.uses_query.extend(wsschemes) 83urlparse.uses_fragment.extend(wsschemes) 84 85 86 87def createWsUrl(hostname, port = None, isSecure = False, path = None, params = None): 88 """ 89 Create a WebSocket URL from components. 90 91 :param hostname: WebSocket server hostname. 92 :type hostname: str 93 :param port: WebSocket service port or None (to select default ports 80/443 depending on isSecure). 94 :type port: int 95 :param isSecure: Set True for secure WebSocket ("wss" scheme). 96 :type isSecure: bool 97 :param path: Path component of addressed resource (will be properly URL escaped). 98 :type path: str 99 :param params: A dictionary of key-values to construct the query component of the addressed resource (will be properly URL escaped). 100 :type params: dict 101 102 :returns: str -- Constructed WebSocket URL. 103 """ 104 if port is not None: 105 netloc = "%s:%d" % (hostname, port) 106 else: 107 if isSecure: 108 netloc = "%s:443" % hostname 109 else: 110 netloc = "%s:80" % hostname 111 if isSecure: 112 scheme = "wss" 113 else: 114 scheme = "ws" 115 if path is not None: 116 ppath = urllib.parse.quote(path) 117 else: 118 ppath = "/" 119 if params is not None: 120 query = urllib.parse.urlencode(params) 121 else: 122 query = None 123 return urllib.parse.urlunparse((scheme, netloc, ppath, None, query, None)) 124 125 126 127def parseWsUrl(url): 128 """ 129 Parses as WebSocket URL into it's components and returns a tuple (isSecure, host, port, resource, path, params). 130 131 isSecure is a flag which is True for wss URLs. 132 host is the hostname or IP from the URL. 133 port is the port from the URL or standard port derived from scheme (ws = 80, wss = 443). 134 resource is the /resource name/ from the URL, the /path/ together with the (optional) /query/ component. 135 path is the /path/ component properly unescaped. 136 params is the /query) component properly unescaped and returned as dictionary. 137 138 :param url: A valid WebSocket URL, i.e. `ws://localhost:9000/myresource?param1=23¶m2=666` 139 :type url: str 140 141 :returns: tuple -- A tuple (isSecure, host, port, resource, path, params) 142 """ 143 parsed = urlparse.urlparse(url) 144 if not parsed.hostname or parsed.hostname == "": 145 raise Exception("invalid WebSocket URL: missing hostname") 146 if parsed.scheme not in ["ws", "wss"]: 147 raise Exception("invalid WebSocket URL: bogus protocol scheme '%s'" % parsed.scheme) 148 if parsed.port is None or parsed.port == "": 149 if parsed.scheme == "ws": 150 port = 80 151 else: 152 port = 443 153 else: 154 port = int(parsed.port) 155 if parsed.fragment is not None and parsed.fragment != "": 156 raise Exception("invalid WebSocket URL: non-empty fragment '%s" % parsed.fragment) 157 if parsed.path is not None and parsed.path != "": 158 ppath = parsed.path 159 path = urllib.parse.unquote(ppath) 160 else: 161 ppath = "/" 162 path = ppath 163 if parsed.query is not None and parsed.query != "": 164 resource = ppath + "?" + parsed.query 165 params = urlparse.parse_qs(parsed.query) 166 else: 167 resource = ppath 168 params = {} 169 return (parsed.scheme == "wss", parsed.hostname, port, resource, path, params) 170 171 172 173class TrafficStats: 174 175 def __init__(self): 176 self.reset() 177 178 179 def reset(self): 180 ## all of the following only tracks data messages, not control frames, not handshaking 181 ## 182 self.outgoingOctetsWireLevel = 0 183 self.outgoingOctetsWebSocketLevel = 0 184 self.outgoingOctetsAppLevel = 0 185 self.outgoingWebSocketFrames = 0 186 self.outgoingWebSocketMessages = 0 187 188 self.incomingOctetsWireLevel = 0 189 self.incomingOctetsWebSocketLevel = 0 190 self.incomingOctetsAppLevel = 0 191 self.incomingWebSocketFrames = 0 192 self.incomingWebSocketMessages = 0 193 194 ## the following track any traffic before the WebSocket connection 195 ## reaches STATE_OPEN (this includes WebSocket opening handshake 196 ## proxy handling and such) 197 ## 198 self.preopenOutgoingOctetsWireLevel = 0 199 self.preopenIncomingOctetsWireLevel = 0 200 201 202 def __json__(self): 203 204 ## compression ratio = compressed size / uncompressed size 205 ## 206 if self.outgoingOctetsAppLevel > 0: 207 outgoingCompressionRatio = float(self.outgoingOctetsWebSocketLevel) / float(self.outgoingOctetsAppLevel) 208 else: 209 outgoingCompressionRatio = None 210 if self.incomingOctetsAppLevel > 0: 211 incomingCompressionRatio = float(self.incomingOctetsWebSocketLevel) / float(self.incomingOctetsAppLevel) 212 else: 213 incomingCompressionRatio = None 214 215 ## protocol overhead = non-payload size / payload size 216 ## 217 if self.outgoingOctetsWebSocketLevel > 0: 218 outgoingWebSocketOverhead = float(self.outgoingOctetsWireLevel - self.outgoingOctetsWebSocketLevel) / float(self.outgoingOctetsWebSocketLevel) 219 else: 220 outgoingWebSocketOverhead = None 221 if self.incomingOctetsWebSocketLevel > 0: 222 incomingWebSocketOverhead = float(self.incomingOctetsWireLevel - self.incomingOctetsWebSocketLevel) / float(self.incomingOctetsWebSocketLevel) 223 else: 224 incomingWebSocketOverhead = None 225 226 return {'outgoingOctetsWireLevel': self.outgoingOctetsWireLevel, 227 'outgoingOctetsWebSocketLevel': self.outgoingOctetsWebSocketLevel, 228 'outgoingOctetsAppLevel': self.outgoingOctetsAppLevel, 229 'outgoingCompressionRatio': outgoingCompressionRatio, 230 'outgoingWebSocketOverhead': outgoingWebSocketOverhead, 231 'outgoingWebSocketFrames': self.outgoingWebSocketFrames, 232 'outgoingWebSocketMessages': self.outgoingWebSocketMessages, 233 'preopenOutgoingOctetsWireLevel': self.preopenOutgoingOctetsWireLevel, 234 235 'incomingOctetsWireLevel': self.incomingOctetsWireLevel, 236 'incomingOctetsWebSocketLevel': self.incomingOctetsWebSocketLevel, 237 'incomingOctetsAppLevel': self.incomingOctetsAppLevel, 238 'incomingCompressionRatio': incomingCompressionRatio, 239 'incomingWebSocketOverhead': incomingWebSocketOverhead, 240 'incomingWebSocketFrames': self.incomingWebSocketFrames, 241 'incomingWebSocketMessages': self.incomingWebSocketMessages, 242 'preopenIncomingOctetsWireLevel': self.preopenIncomingOctetsWireLevel} 243 244 245 def __str__(self): 246 return json.dumps(self.__json__()) 247 248 249 250class FrameHeader: 251 """ 252 Thin-wrapper for storing WebSocket frame metadata. 253 254 FOR INTERNAL USE ONLY! 255 """ 256 257 def __init__(self, opcode, fin, rsv, length, mask): 258 """ 259 Constructor. 260 261 :param opcode: Frame opcode (0-15). 262 :type opcode: int 263 :param fin: Frame FIN flag. 264 :type fin: bool 265 :param rsv: Frame reserved flags (0-7). 266 :type rsv: int 267 :param length: Frame payload length. 268 :type length: int 269 :param mask: Frame mask (binary string) or None. 270 :type mask: str 271 """ 272 self.opcode = opcode 273 self.fin = fin 274 self.rsv = rsv 275 self.length = length 276 self.mask = mask 277 278 279 280class ConnectionRequest: 281 """ 282 Thin-wrapper for WebSocket connection request information provided in 283 :meth:`autobahn.websocket.protocol.WebSocketServerProtocol.onConnect` when 284 a WebSocket client want to establish a connection to a WebSocket server. 285 """ 286 def __init__(self, peer, headers, host, path, params, version, origin, protocols, extensions): 287 """ 288 Constructor. 289 290 :param peer: Descriptor of the connecting client (e.g. IP address/port in case of TCP transports). 291 :type peer: str 292 :param headers: HTTP headers from opening handshake request. 293 :type headers: dict 294 :param host: Host from opening handshake HTTP header. 295 :type host: str 296 :param path: Path from requested HTTP resource URI. For example, a resource URI of `/myservice?foo=23&foo=66&bar=2` will be parsed to `/myservice`. 297 :type path: str 298 :param params: Query parameters (if any) from requested HTTP resource URI. For example, a resource URI of `/myservice?foo=23&foo=66&bar=2` will be parsed to `{'foo': ['23', '66'], 'bar': ['2']}`. 299 :type params: dict of arrays of strings 300 :param version: The WebSocket protocol version the client announced (and will be spoken, when connection is accepted). 301 :type version: int 302 :param origin: The WebSocket origin header or None. Note that this only a reliable source of information for browser clients! 303 :type origin: str 304 :param protocols: The WebSocket (sub)protocols the client announced. You must select and return one of those (or None) in :meth:`autobahn.websocket.WebSocketServerProtocol.onConnect`. 305 :type protocols: array of strings 306 :param extensions: The WebSocket extensions the client requested and the server accepted (and thus will be spoken, when WS connection is established). 307 :type extensions: array of strings 308 """ 309 self.peer = peer 310 self.headers = headers 311 self.host = host 312 self.path = path 313 self.params = params 314 self.version = version 315 self.origin = origin 316 self.protocols = protocols 317 self.extensions = extensions 318 319 320 def __json__(self): 321 return {'peer': self.peer, 322 'headers': self.headers, 323 'host': self.host, 324 'path': self.path, 325 'params': self.params, 326 'version': self.version, 327 'origin': self.origin, 328 'protocols': self.protocols, 329 'extensions': self.extensions} 330 331 332 def __str__(self): 333 return json.dumps(self.__json__()) 334 335 336 337class ConnectionResponse: 338 """ 339 Thin-wrapper for WebSocket connection response information provided in 340 :meth:`autobahn.websocket.protocol.WebSocketClientProtocol.onConnect` when 341 a WebSocket server has accepted a connection request by a client. 342 """ 343 def __init__(self, peer, headers, version, protocol, extensions): 344 """ 345 Constructor. 346 347 :param peer: Descriptor of the connected server (e.g. IP address/port in case of TCP transport). 348 :type peer: str 349 :param headers: HTTP headers from opening handshake response. 350 :type headers: dict 351 :param version: The WebSocket protocol version that is spoken. 352 :type version: int 353 :param protocol: The WebSocket (sub)protocol in use. 354 :type protocol: str 355 :param extensions: The WebSocket extensions in use. 356 :type extensions: array of strings 357 """ 358 self.peer = peer 359 self.headers = headers 360 self.version = version 361 self.protocol = protocol 362 self.extensions = extensions 363 364 365 def __json__(self): 366 return {'peer': self.peer, 367 'headers': self.headers, 368 'version': self.version, 369 'protocol': self.protocol, 370 'extensions': self.extensions} 371 372 373 def __str__(self): 374 return json.dumps(self.__json__()) 375 376 377 378def parseHttpHeader(data): 379 """ 380 Parses the beginning of a HTTP request header (the data up to the \n\n line) into a pair 381 of status line and HTTP headers dictionary. 382 Header keys are normalized to all-lower-case. 383 384 FOR INTERNAL USE ONLY! 385 386 :param data: The HTTP header data up to the \n\n line. 387 :type data: str 388 389 :returns: tuple -- Tuple of HTTP status line, headers and headers count. 390 """ 391 raw = data.decode('utf8').splitlines() 392 http_status_line = raw[0].strip() 393 http_headers = {} 394 http_headers_cnt = {} 395 for h in raw[1:]: 396 i = h.find(":") 397 if i > 0: 398 ## HTTP header keys are case-insensitive 399 key = h[:i].strip().lower() 400 401 ## not sure if UTF-8 is allowed for HTTP header values.. 402 value = h[i+1:].strip() 403 404 ## handle HTTP headers split across multiple lines 405 if key in http_headers: 406 http_headers[key] += ", %s" % value 407 http_headers_cnt[key] += 1 408 else: 409 http_headers[key] = value 410 http_headers_cnt[key] = 1 411 else: 412 # skip bad HTTP header 413 pass 414 return (http_status_line, http_headers, http_headers_cnt) 415 416 417 418class Timings: 419 """ 420 Helper class to track timings by key. This class also supports item access, 421 iteration and conversion to string. 422 """ 423 424 def __init__(self): 425 self._stopwatch = Stopwatch() 426 self._timings = {} 427 428 def track(self, key): 429 """ 430 Track elapsed for key. 431 432 :param key: Key under which to track the timing. 433 :type key: str 434 """ 435 self._timings[key] = self._stopwatch.elapsed() 436 437 def diff(self, startKey, endKey, format = True): 438 """ 439 Get elapsed difference between two previously tracked keys. 440 441 :param startKey: First key for interval (older timestamp). 442 :type startKey: str 443 :param endKey: Second key for interval (younger timestamp). 444 :type endKey: str 445 :param format: If `True`, format computed time period and return string. 446 :type format: bool 447 448 :returns: float or str -- Computed time period in seconds (or formatted string). 449 """ 450 if endKey in self._timings and startKey in self._timings: 451 d = self._timings[endKey] - self._timings[startKey] 452 if format: 453 if d < 0.00001: # 10us 454 s = "%d ns" % round(d * 1000000000.) 455 elif d < 0.01: # 10ms 456 s = "%d us" % round(d * 1000000.) 457 elif d < 10: # 10s 458 s = "%d ms" % round(d * 1000.) 459 else: 460 s = "%d s" % round(d) 461 return s.rjust(8) 462 else: 463 return d 464 else: 465 if format: 466 return "n.a.".rjust(8) 467 else: 468 return None 469 470 def __getitem__(self, key): 471 return self._timings.get(key, None) 472 473 def __iter__(self): 474 return self._timings.__iter__() 475 476 def __str__(self): 477 return pformat(self._timings) 478 479 480 481class WebSocketProtocol: 482 """ 483 Protocol base class for WebSocket. 484 485 This class implements: 486 487 * :class:`autobahn.websocket.interfaces.IWebSocketChannel` 488 * :class:`autobahn.websocket.interfaces.IWebSocketChannelFrameApi` 489 * :class:`autobahn.websocket.interfaces.IWebSocketChannelStreamingApi` 490 """ 491 492 SUPPORTED_SPEC_VERSIONS = [0, 10, 11, 12, 13, 14, 15, 16, 17, 18] 493 """ 494 WebSocket protocol spec (draft) versions supported by this implementation. 495 Use of version 18 indicates RFC6455. Use of versions < 18 indicate actual 496 draft spec versions (Hybi-Drafts). Use of version 0 indicates Hixie-76. 497 """ 498 499 SUPPORTED_PROTOCOL_VERSIONS = [0, 8, 13] 500 """ 501 WebSocket protocol versions supported by this implementation. For Hixie-76, 502 there is no protocol version announced in HTTP header, and we just use the 503 draft version (0) in this case. 504 """ 505 506 SPEC_TO_PROTOCOL_VERSION = {0: 0, 10: 8, 11: 8, 12: 8, 13: 13, 14: 13, 15: 13, 16: 13, 17: 13, 18: 13} 507 """ 508 Mapping from protocol spec (draft) version to protocol version. For Hixie-76, 509 there is no protocol version announced in HTTP header, and we just use the 510 pseudo protocol version 0 in this case. 511 """ 512 513 PROTOCOL_TO_SPEC_VERSION = {0: 0, 8: 12, 13: 18} 514 """ 515 Mapping from protocol version to the latest protocol spec (draft) version 516 using that protocol version. For Hixie-76, there is no protocol version 517 announced in HTTP header, and we just use the draft version (0) in this case. 518 """ 519 520 DEFAULT_SPEC_VERSION = 18 521 """ 522 Default WebSocket protocol spec version this implementation speaks: final RFC6455. 523 """ 524 525 DEFAULT_ALLOW_HIXIE76 = False 526 """ 527 By default, this implementation will not allow to speak the obsoleted 528 Hixie-76 protocol version. That protocol version has security issues, but 529 is still spoken by some clients. Enable at your own risk! Enabling can be 530 done by using setProtocolOptions() on the factories for clients and servers. 531 """ 532 533 _WS_MAGIC = b"258EAFA5-E914-47DA-95CA-C5AB0DC85B11" 534 """ 535 Protocol defined magic used during WebSocket handshake (used in Hybi-drafts 536 and final RFC6455. 537 """ 538 539 _QUEUED_WRITE_DELAY = 0.00001 540 """ 541 For synched/chopped writes, this is the reactor reentry delay in seconds. 542 """ 543 544 MESSAGE_TYPE_TEXT = 1 545 """ 546 WebSocket text message type (UTF-8 payload). 547 """ 548 549 MESSAGE_TYPE_BINARY = 2 550 """ 551 WebSocket binary message type (arbitrary binary payload). 552 """ 553 554 ## WebSocket protocol state: 555 ## (STATE_PROXY_CONNECTING) => STATE_CONNECTING => STATE_OPEN => STATE_CLOSING => STATE_CLOSED 556 ## 557 STATE_CLOSED = 0 558 STATE_CONNECTING = 1 559 STATE_CLOSING = 2 560 STATE_OPEN = 3 561 STATE_PROXY_CONNECTING = 4 562 563 ## Streaming Send State 564 SEND_STATE_GROUND = 0 565 SEND_STATE_MESSAGE_BEGIN = 1 566 SEND_STATE_INSIDE_MESSAGE = 2 567 SEND_STATE_INSIDE_MESSAGE_FRAME = 3 568 569 ## WebSocket protocol close codes 570 ## 571 CLOSE_STATUS_CODE_NORMAL = 1000 572 """Normal close of connection.""" 573 574 CLOSE_STATUS_CODE_GOING_AWAY = 1001 575 """Going away.""" 576 577 CLOSE_STATUS_CODE_PROTOCOL_ERROR = 1002 578 """Protocol error.""" 579 580 CLOSE_STATUS_CODE_UNSUPPORTED_DATA = 1003 581 """Unsupported data.""" 582 583 CLOSE_STATUS_CODE_RESERVED1 = 1004 584 """RESERVED""" 585 586 CLOSE_STATUS_CODE_NULL = 1005 # MUST NOT be set in close frame! 587 """No status received. (MUST NOT be used as status code when sending a close).""" 588 589 CLOSE_STATUS_CODE_ABNORMAL_CLOSE = 1006 # MUST NOT be set in close frame! 590 """Abnormal close of connection. (MUST NOT be used as status code when sending a close).""" 591 592 CLOSE_STATUS_CODE_INVALID_PAYLOAD = 1007 593 """Invalid frame payload data.""" 594 595 CLOSE_STATUS_CODE_POLICY_VIOLATION = 1008 596 """Policy violation.""" 597 598 CLOSE_STATUS_CODE_MESSAGE_TOO_BIG = 1009 599 """Message too big.""" 600 601 CLOSE_STATUS_CODE_MANDATORY_EXTENSION = 1010 602 """Mandatory extension.""" 603 604 CLOSE_STATUS_CODE_INTERNAL_ERROR = 1011 605 """The peer encountered an unexpected condition or internal error.""" 606 607 CLOSE_STATUS_CODE_TLS_HANDSHAKE_FAILED = 1015 # MUST NOT be set in close frame! 608 """TLS handshake failed, i.e. server certificate could not be verified. (MUST NOT be used as status code when sending a close).""" 609 610 CLOSE_STATUS_CODES_ALLOWED = [CLOSE_STATUS_CODE_NORMAL, 611 CLOSE_STATUS_CODE_GOING_AWAY, 612 CLOSE_STATUS_CODE_PROTOCOL_ERROR, 613 CLOSE_STATUS_CODE_UNSUPPORTED_DATA, 614 CLOSE_STATUS_CODE_INVALID_PAYLOAD, 615 CLOSE_STATUS_CODE_POLICY_VIOLATION, 616 CLOSE_STATUS_CODE_MESSAGE_TOO_BIG, 617 CLOSE_STATUS_CODE_MANDATORY_EXTENSION, 618 CLOSE_STATUS_CODE_INTERNAL_ERROR] 619 """Status codes allowed to send in close.""" 620 621 622 CONFIG_ATTRS_COMMON = ['debug', 623 'debugCodePaths', 624 'logOctets', 625 'logFrames', 626 'trackTimings', 627 'allowHixie76', 628 'utf8validateIncoming', 629 'applyMask', 630 'maxFramePayloadSize', 631 'maxMessagePayloadSize', 632 'autoFragmentSize', 633 'failByDrop', 634 'echoCloseCodeReason', 635 'openHandshakeTimeout', 636 'closeHandshakeTimeout', 637 'tcpNoDelay'] 638 """ 639 Configuration attributes common to servers and clients. 640 """ 641 642 CONFIG_ATTRS_SERVER = ['versions', 643 'webStatus', 644 'requireMaskedClientFrames', 645 'maskServerFrames', 646 'perMessageCompressionAccept'] 647 """ 648 Configuration attributes specific to servers. 649 """ 650 651 CONFIG_ATTRS_CLIENT = ['version', 652 'acceptMaskedServerFrames', 653 'maskClientFrames', 654 'serverConnectionDropTimeout', 655 'perMessageCompressionOffers', 656 'perMessageCompressionAccept'] 657 """ 658 Configuration attributes specific to clients. 659 """ 660 661 662 def onOpen(self): 663 """ 664 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onOpen` 665 """ 666 if self.debugCodePaths: 667 self.factory._log("WebSocketProtocol.onOpen") 668 669 670 def onMessageBegin(self, isBinary): 671 """ 672 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageBegin` 673 """ 674 self.message_is_binary = isBinary 675 self.message_data = [] 676 self.message_data_total_length = 0 677 678 679 def onMessageFrameBegin(self, length): 680 """ 681 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameBegin` 682 """ 683 self.frame_length = length 684 self.frame_data = [] 685 self.message_data_total_length += length 686 if not self.failedByMe: 687 if self.maxMessagePayloadSize > 0 and self.message_data_total_length > self.maxMessagePayloadSize: 688 self.wasMaxMessagePayloadSizeExceeded = True 689 self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_MESSAGE_TOO_BIG, "message exceeds payload limit of %d octets" % self.maxMessagePayloadSize) 690 elif self.maxFramePayloadSize > 0 and length > self.maxFramePayloadSize: 691 self.wasMaxFramePayloadSizeExceeded = True 692 self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_POLICY_VIOLATION, "frame exceeds payload limit of %d octets" % self.maxFramePayloadSize) 693 694 695 def onMessageFrameData(self, payload): 696 """ 697 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameData` 698 """ 699 if not self.failedByMe: 700 if self.websocket_version == 0: 701 self.message_data_total_length += len(payload) 702 if self.maxMessagePayloadSize > 0 and self.message_data_total_length > self.maxMessagePayloadSize: 703 self.wasMaxMessagePayloadSizeExceeded = True 704 self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_MESSAGE_TOO_BIG, "message exceeds payload limit of %d octets" % self.maxMessagePayloadSize) 705 self.message_data.append(payload) 706 else: 707 self.frame_data.append(payload) 708 709 710 def onMessageFrameEnd(self): 711 """ 712 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrameEnd` 713 """ 714 if not self.failedByMe: 715 self._onMessageFrame(self.frame_data) 716 717 self.frame_data = None 718 719 720 def onMessageFrame(self, payload): 721 """ 722 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageFrame` 723 """ 724 if not self.failedByMe: 725 self.message_data.extend(payload) 726 727 728 def onMessageEnd(self): 729 """ 730 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessageEnd` 731 """ 732 if not self.failedByMe: 733 payload = b''.join(self.message_data) 734 if self.trackedTimings: 735 self.trackedTimings.track("onMessage") 736 self._onMessage(payload, self.message_is_binary) 737 738 self.message_data = None 739 740 741 def onMessage(self, payload, isBinary): 742 """ 743 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onMessage` 744 """ 745 if self.debug: 746 self.factory._log("WebSocketProtocol.onMessage") 747 748 749 def onPing(self, payload): 750 """ 751 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onPing` 752 """ 753 if self.debug: 754 self.factory._log("WebSocketProtocol.onPing") 755 if self.state == WebSocketProtocol.STATE_OPEN: 756 self.sendPong(payload) 757 758 759 def onPong(self, payload): 760 """ 761 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onPong` 762 """ 763 if self.debug: 764 self.factory._log("WebSocketProtocol.onPong") 765 766 767 def onClose(self, wasClean, code, reason): 768 """ 769 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.onClose` 770 """ 771 if self.debugCodePaths: 772 s = "WebSocketProtocol.onClose:\n" 773 s += "wasClean=%s\n" % wasClean 774 s += "code=%s\n" % code 775 s += "reason=%s\n" % reason 776 s += "self.closedByMe=%s\n" % self.closedByMe 777 s += "self.failedByMe=%s\n" % self.failedByMe 778 s += "self.droppedByMe=%s\n" % self.droppedByMe 779 s += "self.wasClean=%s\n" % self.wasClean 780 s += "self.wasNotCleanReason=%s\n" % self.wasNotCleanReason 781 s += "self.localCloseCode=%s\n" % self.localCloseCode 782 s += "self.localCloseReason=%s\n" % self.localCloseReason 783 s += "self.remoteCloseCode=%s\n" % self.remoteCloseCode 784 s += "self.remoteCloseReason=%s\n" % self.remoteCloseReason 785 self.factory._log(s) 786 787 788 def onCloseFrame(self, code, reasonRaw): 789 """ 790 Callback when a Close frame was received. The default implementation answers by 791 sending a Close when no Close was sent before. Otherwise it drops 792 the TCP connection either immediately (when we are a server) or after a timeout 793 (when we are a client and expect the server to drop the TCP). 794 795 Modes: Hybi, Hixie 796 797 Notes: 798 - For Hixie mode, this method is slightly misnamed for historic reasons. 799 - For Hixie mode, code and reasonRaw are silently ignored. 800 801 :param code: None or close status code, if there was one (:class:`WebSocketProtocol`.CLOSE_STATUS_CODE_*). 802 :type code: int 803 :param reason: None or close reason (when present, a status code MUST have been also be present). 804 :type reason: str 805 """ 806 if self.debugCodePaths: 807 self.factory._log("WebSocketProtocol.onCloseFrame") 808 809 self.remoteCloseCode = code 810 811 ## reserved close codes: 0-999, 1004, 1005, 1006, 1011-2999, >= 5000 812 ## 813 if code is not None and (code < 1000 or (code >= 1000 and code <= 2999 and code not in WebSocketProtocol.CLOSE_STATUS_CODES_ALLOWED) or code >= 5000): 814 if self.protocolViolation("invalid close code %d" % code): 815 return True 816 817 ## closing reason 818 ## 819 if reasonRaw is not None: 820 ## we use our own UTF-8 validator to get consistent and fully conformant 821 ## UTF-8 validation behavior 822 u = Utf8Validator() 823 val = u.validate(reasonRaw) 824 if not val[0]: 825 if self.invalidPayload("invalid close reason (non-UTF-8 payload)"): 826 return True 827 self.remoteCloseReason = reasonRaw.decode('utf8') 828 829 if self.state == WebSocketProtocol.STATE_CLOSING: 830 ## We already initiated the closing handshake, so this 831 ## is the peer's reply to our close frame. 832 833 ## cancel any closing HS timer if present 834 ## 835 if self.closeHandshakeTimeoutCall is not None: 836 if self.debugCodePaths: 837 self.factory._log("closeHandshakeTimeoutCall.cancel") 838 self.closeHandshakeTimeoutCall.cancel() 839 self.closeHandshakeTimeoutCall = None 840 841 self.wasClean = True 842 843 if self.factory.isServer: 844 ## When we are a server, we immediately drop the TCP. 845 self.dropConnection(abort = True) 846 else: 847 ## When we are a client, the server should drop the TCP 848 ## If that doesn't happen, we do. And that will set wasClean = False. 849 if self.serverConnectionDropTimeout > 0: 850 self.serverConnectionDropTimeoutCall = self.factory._callLater(self.serverConnectionDropTimeout, self.onServerConnectionDropTimeout) 851 852 elif self.state == WebSocketProtocol.STATE_OPEN: 853 ## The peer initiates a closing handshake, so we reply 854 ## by sending close frame. 855 856 self.wasClean = True 857 858 if self.websocket_version == 0: 859 self.sendCloseFrame(isReply = True) 860 else: 861 ## Either reply with same code/reason, or code == NORMAL/reason=None 862 if self.echoCloseCodeReason: 863 self.sendCloseFrame(code = code, reasonUtf8 = reason.encode("UTF-8"), isReply = True) 864 else: 865 self.sendCloseFrame(code = WebSocketProtocol.CLOSE_STATUS_CODE_NORMAL, isReply = True) 866 867 if self.factory.isServer: 868 ## When we are a server, we immediately drop the TCP. 869 self.dropConnection(abort = False) 870 else: 871 ## When we are a client, we expect the server to drop the TCP, 872 ## and when the server fails to do so, a timeout in sendCloseFrame() 873 ## will set wasClean = False back again. 874 pass 875 876 else: 877 ## STATE_PROXY_CONNECTING, STATE_CONNECTING, STATE_CLOSED 878 raise Exception("logic error") 879 880 881 def onServerConnectionDropTimeout(self): 882 """ 883 We (a client) expected the peer (a server) to drop the connection, 884 but it didn't (in time self.serverConnectionDropTimeout). 885 So we drop the connection, but set self.wasClean = False. 886 887 Modes: Hybi, Hixie 888 """ 889 self.serverConnectionDropTimeoutCall = None 890 if self.state != WebSocketProtocol.STATE_CLOSED: 891 if self.debugCodePaths: 892 self.factory._log("onServerConnectionDropTimeout") 893 self.wasClean = False 894 self.wasNotCleanReason = "server did not drop TCP connection (in time)" 895 self.wasServerConnectionDropTimeout = True 896 self.dropConnection(abort = True) 897 else: 898 if self.debugCodePaths: 899 self.factory._log("skipping onServerConnectionDropTimeout since connection is already closed") 900 901 902 def onOpenHandshakeTimeout(self): 903 """ 904 We expected the peer to complete the opening handshake with to us. 905 It didn't do so (in time self.openHandshakeTimeout). 906 So we drop the connection, but set self.wasClean = False. 907 908 Modes: Hybi, Hixie 909 """ 910 self.openHandshakeTimeoutCall = None 911 if self.state in [WebSocketProtocol.STATE_CONNECTING, WebSocketProtocol.STATE_PROXY_CONNECTING]: 912 if self.debugCodePaths: 913 self.factory._log("onOpenHandshakeTimeout fired") 914 self.wasClean = False 915 self.wasNotCleanReason = "peer did not finish (in time) the opening handshake" 916 self.wasOpenHandshakeTimeout = True 917 self.dropConnection(abort = True) 918 elif self.state == WebSocketProtocol.STATE_OPEN: 919 if self.debugCodePaths: 920 self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is open (opening handshake already finished)") 921 elif self.state == WebSocketProtocol.STATE_CLOSING: 922 if self.debugCodePaths: 923 self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection is closing") 924 elif self.state == WebSocketProtocol.STATE_CLOSED: 925 if self.debugCodePaths: 926 self.factory._log("skipping onOpenHandshakeTimeout since WebSocket connection already closed") 927 else: 928 # should not arrive here 929 raise Exception("logic error") 930 931 932 def onCloseHandshakeTimeout(self): 933 """ 934 We expected the peer to respond to us initiating a close handshake. It didn't 935 respond (in time self.closeHandshakeTimeout) with a close response frame though. 936 So we drop the connection, but set self.wasClean = False. 937 938 Modes: Hybi, Hixie 939 """ 940 self.closeHandshakeTimeoutCall = None 941 if self.state != WebSocketProtocol.STATE_CLOSED: 942 if self.debugCodePaths: 943 self.factory._log("onCloseHandshakeTimeout fired") 944 self.wasClean = False 945 self.wasNotCleanReason = "peer did not respond (in time) in closing handshake" 946 self.wasCloseHandshakeTimeout = True 947 self.dropConnection(abort = True) 948 else: 949 if self.debugCodePaths: 950 self.factory._log("skipping onCloseHandshakeTimeout since connection is already closed") 951 952 953 def dropConnection(self, abort = False): 954 """ 955 Drop the underlying TCP connection. 956 957 Modes: Hybi, Hixie 958 """ 959 if self.state != WebSocketProtocol.STATE_CLOSED: 960 if self.debugCodePaths: 961 self.factory._log("dropping connection") 962 self.droppedByMe = True 963 self.state = WebSocketProtocol.STATE_CLOSED 964 965 self._closeConnection(abort) 966 else: 967 if self.debugCodePaths: 968 self.factory._log("skipping dropConnection since connection is already closed") 969 970 971 def failConnection(self, code = CLOSE_STATUS_CODE_GOING_AWAY, reason = "Going Away"): 972 """ 973 Fails the WebSocket connection. 974 975 Modes: Hybi, Hixie 976 977 Notes: 978 - For Hixie mode, the code and reason are silently ignored. 979 """ 980 if self.state != WebSocketProtocol.STATE_CLOSED: 981 if self.debugCodePaths: 982 self.factory._log("Failing connection : %s - %s" % (code, reason)) 983 984 self.failedByMe = True 985 986 if self.failByDrop: 987 ## brutally drop the TCP connection 988 self.wasClean = False 989 self.wasNotCleanReason = "I failed the WebSocket connection by dropping the TCP connection" 990 self.dropConnection(abort = True) 991 992 else: 993 if self.state != WebSocketProtocol.STATE_CLOSING: 994 ## perform WebSocket closing handshake 995 self.sendCloseFrame(code = code, reasonUtf8 = reason.encode("UTF-8")[:125-2], isReply = False) 996 else: 997 ## already performing closing handshake .. we now drop the TCP 998 ## (this can happen e.g. if we encounter a 2nd protocol violation during closing HS) 999 self.dropConnection(abort = False) 1000 1001 else: 1002 if self.debugCodePaths: 1003 self.factory._log("skipping failConnection since connection is already closed") 1004 1005 1006 def protocolViolation(self, reason): 1007 """ 1008 Fired when a WebSocket protocol violation/error occurs. 1009 1010 Modes: Hybi, Hixie 1011 1012 Notes: 1013 - For Hixie mode, reason is silently ignored. 1014 1015 :param reason: Protocol violation that was encountered (human readable). 1016 :type reason: str 1017 1018 :returns: bool -- True, when any further processing should be discontinued. 1019 """ 1020 if self.debugCodePaths: 1021 self.factory._log("Protocol violation : %s" % reason) 1022 self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_PROTOCOL_ERROR, reason) 1023 if self.failByDrop: 1024 return True 1025 else: 1026 ## if we don't immediately drop the TCP, we need to skip the invalid frame 1027 ## to continue to later receive the closing handshake reply 1028 return False 1029 1030 1031 def invalidPayload(self, reason): 1032 """ 1033 Fired when invalid payload is encountered. Currently, this only happens 1034 for text message when payload is invalid UTF-8 or close frames with 1035 close reason that is invalid UTF-8. 1036 1037 Modes: Hybi, Hixie 1038 1039 Notes: 1040 - For Hixie mode, reason is silently ignored. 1041 1042 :param reason: What was invalid for the payload (human readable). 1043 :type reason: str 1044 1045 :returns: bool -- True, when any further processing should be discontinued. 1046 """ 1047 if self.debugCodePaths: 1048 self.factory._log("Invalid payload : %s" % reason) 1049 self.failConnection(WebSocketProtocol.CLOSE_STATUS_CODE_INVALID_PAYLOAD, reason) 1050 if self.failByDrop: 1051 return True 1052 else: 1053 ## if we don't immediately drop the TCP, we need to skip the invalid frame 1054 ## to continue to later receive the closing handshake reply 1055 return False 1056 1057 1058 def setTrackTimings(self, enable): 1059 """ 1060 Enable/disable tracking of detailed timings. 1061 1062 :param enable: Turn time tracking on/off. 1063 :type enable: bool 1064 """ 1065 if not hasattr(self, 'trackTimings') or self.trackTimings != enable: 1066 self.trackTimings = enable 1067 if self.trackTimings: 1068 self.trackedTimings = Timings() 1069 else: 1070 self.trackedTimings = None 1071 1072 1073 def _connectionMade(self): 1074 """ 1075 This is called by network framework when a new TCP connection has been established 1076 and handed over to a Protocol instance (an instance of this class). 1077 1078 Modes: Hybi, Hixie 1079 """ 1080 1081 ## copy default options from factory (so we are not affected by changed on 1082 ## those), but only copy if not already set on protocol instance (allow 1083 ## to set configuration individually) 1084 ## 1085 configAttrLog = [] 1086 for configAttr in self.CONFIG_ATTRS: 1087 if not hasattr(self, configAttr): 1088 setattr(self, configAttr, getattr(self.factory, configAttr)) 1089 configAttrSource = self.factory.__class__.__name__ 1090 else: 1091 configAttrSource = self.__class__.__name__ 1092 configAttrLog.append((configAttr, getattr(self, configAttr), configAttrSource)) 1093 1094 if self.debug: 1095 #self.factory._log(configAttrLog) 1096 self.factory._log("\n" + pformat(configAttrLog)) 1097 1098 ## permessage-compress extension 1099 self._perMessageCompress = None 1100 1101 ## Time tracking 1102 self.trackedTimings = None 1103 self.setTrackTimings(self.trackTimings) 1104 1105 ## Traffic stats 1106 self.trafficStats = TrafficStats() 1107 1108 ## initial state 1109 if not self.factory.isServer and self.factory.proxy is not None: 1110 self.state = WebSocketProtocol.STATE_PROXY_CONNECTING 1111 else: 1112 self.state = WebSocketProtocol.STATE_CONNECTING 1113 self.send_state = WebSocketProtocol.SEND_STATE_GROUND 1114 self.data = b"" 1115 1116 ## for chopped/synched sends, we need to queue to maintain 1117 ## ordering when recalling the reactor to actually "force" 1118 ## the octets to wire (see test/trickling in the repo) 1119 self.send_queue = deque() 1120 self.triggered = False 1121 1122 ## incremental UTF8 validator 1123 self.utf8validator = Utf8Validator() 1124 1125 ## track when frame/message payload sizes (incoming) were exceeded 1126 self.wasMaxFramePayloadSizeExceeded = False 1127 self.wasMaxMessagePayloadSizeExceeded = False 1128 1129 ## the following vars are related to connection close handling/tracking 1130 1131 # True, iff I have initiated closing HS (that is, did send close first) 1132 self.closedByMe = False 1133 1134 # True, iff I have failed the WS connection (i.e. due to protocol error) 1135 # Failing can be either by initiating close HS or brutal drop (this is 1136 # controlled by failByDrop option) 1137 self.failedByMe = False 1138 1139 # True, iff I dropped the TCP connection (called transport.loseConnection()) 1140 self.droppedByMe = False 1141 1142 # True, iff full WebSocket closing handshake was performed (close frame sent 1143 # and received) _and_ the server dropped the TCP (which is its responsibility) 1144 self.wasClean = False 1145 1146 # When self.wasClean = False, the reason (what happened) 1147 self.wasNotCleanReason = None 1148 1149 # When we are a client, and we expected the server to drop the TCP, but that 1150 # didn't happen in time, this gets True 1151 self.wasServerConnectionDropTimeout = False 1152 1153 # When the initial WebSocket opening handshake times out, this gets True 1154 self.wasOpenHandshakeTimeout = False 1155 1156 # When we initiated a closing handshake, but the peer did not respond in 1157 # time, this gets True 1158 self.wasCloseHandshakeTimeout = False 1159 1160 # The close code I sent in close frame (if any) 1161 self.localCloseCode = None 1162 1163 # The close reason I sent in close frame (if any) 1164 self.localCloseReason = None 1165 1166 # The close code the peer sent me in close frame (if any) 1167 self.remoteCloseCode = None 1168 1169 # The close reason the peer sent me in close frame (if any) 1170 self.remoteCloseReason = None 1171 1172 # timers, which might get set up later, and remembered here to get canceled 1173 # when appropriate 1174 if not self.factory.isServer: 1175 self.serverConnectionDropTimeoutCall = None 1176 self.openHandshakeTimeoutCall = None 1177 self.closeHandshakeTimeoutCall = None 1178 1179 # set opening handshake timeout handler 1180 if self.openHandshakeTimeout > 0: 1181 self.openHandshakeTimeoutCall = self.factory._callLater(self.openHandshakeTimeout, self.onOpenHandshakeTimeout) 1182 1183 1184 def _connectionLost(self, reason): 1185 """ 1186 This is called by network framework when a transport connection was lost. 1187 1188 Modes: Hybi, Hixie 1189 """ 1190 ## cancel any server connection drop timer if present 1191 ## 1192 if not self.factory.isServer and self.serverConnectionDropTimeoutCall is not None: 1193 if self.debugCodePaths: 1194 self.factory._log("serverConnectionDropTimeoutCall.cancel") 1195 self.serverConnectionDropTimeoutCall.cancel() 1196 self.serverConnectionDropTimeoutCall = None 1197 1198 self.state = WebSocketProtocol.STATE_CLOSED 1199 if not self.wasClean: 1200 if not self.droppedByMe and self.wasNotCleanReason is None: 1201 self.wasNotCleanReason = "peer dropped the TCP connection without previous WebSocket closing handshake" 1202 self._onClose(self.wasClean, WebSocketProtocol.CLOSE_STATUS_CODE_ABNORMAL_CLOSE, "connection was closed uncleanly (%s)" % self.wasNotCleanReason) 1203 else: 1204 self._onClose(self.wasClean, self.remoteCloseCode, self.remoteCloseReason) 1205 1206 1207 def logRxOctets(self, data): 1208 """ 1209 Hook fired right after raw octets have been received, but only when self.logOctets == True. 1210 1211 Modes: Hybi, Hixie 1212 """ 1213 self.factory._log("RX Octets from %s : octets = %s" % (self.peer, binascii.b2a_hex(data))) 1214 1215 1216 def logTxOctets(self, data, sync): 1217 """ 1218 Hook fired right after raw octets have been sent, but only when self.logOctets == True. 1219 1220 Modes: Hybi, Hixie 1221 """ 1222 self.factory._log("TX Octets to %s : sync = %s, octets = %s" % (self.peer, sync, binascii.b2a_hex(data))) 1223 1224 1225 def logRxFrame(self, frameHeader, payload): 1226 """ 1227 Hook fired right after WebSocket frame has been received and decoded, but only when self.logFrames == True. 1228 1229 Modes: Hybi 1230 """ 1231 data = b''.join(payload) 1232 info = (self.peer, 1233 frameHeader.fin, 1234 frameHeader.rsv, 1235 frameHeader.opcode, 1236 binascii.b2a_hex(frameHeader.mask) if frameHeader.mask else "-", 1237 frameHeader.length, 1238 data if frameHeader.opcode == 1 else binascii.b2a_hex(data)) 1239 1240 self.factory._log("RX Frame from %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, payload = %s" % info) 1241 1242 1243 def logTxFrame(self, frameHeader, payload, repeatLength, chopsize, sync): 1244 """ 1245 Hook fired right after WebSocket frame has been encoded and sent, but only when self.logFrames == True. 1246 1247 Modes: Hybi 1248 """ 1249 info = (self.peer, 1250 frameHeader.fin, 1251 frameHeader.rsv, 1252 frameHeader.opcode, 1253 binascii.b2a_hex(frameHeader.mask) if frameHeader.mask else "-", 1254 frameHeader.length, 1255 repeatLength, 1256 chopsize, 1257 sync, 1258 payload if frameHeader.opcode == 1 else binascii.b2a_hex(payload)) 1259 1260 self.factory._log("TX Frame to %s : fin = %s, rsv = %s, opcode = %s, mask = %s, length = %s, repeat_length = %s, chopsize = %s, sync = %s, payload = %s" % info) 1261 1262 1263 def _dataReceived(self, data): 1264 """ 1265 This is called by network framework upon receiving data on transport connection. 1266 1267 Modes: Hybi, Hixie 1268 """ 1269 if self.state == WebSocketProtocol.STATE_OPEN: 1270 self.trafficStats.incomingOctetsWireLevel += len(data) 1271 elif self.state == WebSocketProtocol.STATE_CONNECTING or self.state == WebSocketProtocol.STATE_PROXY_CONNECTING: 1272 self.trafficStats.preopenIncomingOctetsWireLevel += len(data) 1273 1274 if self.logOctets: 1275 self.logRxOctets(data) 1276 self.data += data 1277 self.consumeData() 1278 1279 1280 def consumeData(self): 1281 """ 1282 Consume buffered (incoming) data. 1283 1284 Modes: Hybi, Hixie 1285 """ 1286 1287 ## WebSocket is open (handshake was completed) or close was sent 1288 ## 1289 if self.state == WebSocketProtocol.STATE_OPEN or self.state == WebSocketProtocol.STATE_CLOSING: 1290 1291 ## process until no more buffered data left or WS was closed 1292 ## 1293 while self.processData() and self.state != WebSocketProtocol.STATE_CLOSED: 1294 pass 1295 1296 ## need to establish proxy connection 1297 ## 1298 elif self.state == WebSocketProtocol.STATE_PROXY_CONNECTING: 1299 1300 self.processProxyConnect() 1301 1302 ## WebSocket needs handshake 1303 ## 1304 elif self.state == WebSocketProtocol.STATE_CONNECTING: 1305 1306 ## the implementation of processHandshake() in derived 1307 ## class needs to perform client or server handshake 1308 ## from other party here .. 1309 ## 1310 self.processHandshake() 1311 1312 ## we failed the connection .. don't process any more data! 1313 ## 1314 elif self.state == WebSocketProtocol.STATE_CLOSED: 1315 1316 ## ignore any data received after WS was closed 1317 ## 1318 if self.debugCodePaths: 1319 self.factory._log("received data in STATE_CLOSED") 1320 1321 ## should not arrive here (invalid state) 1322 ## 1323 else: 1324 raise Exception("invalid state") 1325 1326 1327 def processProxyConnect(self): 1328 """ 1329 Process proxy connect. 1330 1331 Modes: Hybi, Hixie 1332 """ 1333 raise Exception("must implement proxy connect (client or server) in derived class") 1334 1335 1336 def processHandshake(self): 1337 """ 1338 Process WebSocket handshake. 1339 1340 Modes: Hybi, Hixie 1341 """ 1342 raise Exception("must implement handshake (client or server) in derived class") 1343 1344 1345 def _trigger(self): 1346 """ 1347 Trigger sending stuff from send queue (which is only used for chopped/synched writes). 1348 1349 Modes: Hybi, Hixie 1350 """ 1351 if not self.triggered: 1352 self.triggered = True 1353 self._send() 1354 1355 1356 def _send(self): 1357 """ 1358 Send out stuff from send queue. For details how this works, see test/trickling 1359 in the repo. 1360 1361 Modes: Hybi, Hixie 1362 """ 1363 if len(self.send_queue) > 0: 1364 e = self.send_queue.popleft() 1365 1366 if self.state != WebSocketProtocol.STATE_CLOSED: 1367 1368 self.transport.write(e[0]) 1369 1370 if self.state == WebSocketProtocol.STATE_OPEN: 1371 self.trafficStats.outgoingOctetsWireLevel += len(e[0]) 1372 elif self.state == WebSocketProtocol.STATE_CONNECTING or self.state == WebSocketProtocol.STATE_PROXY_CONNECTING: 1373 self.trafficStats.preopenOutgoingOctetsWireLevel += len(e[0]) 1374 1375 if self.logOctets: 1376 self.logTxOctets(e[0], e[1]) 1377 else: 1378 if self.debugCodePaths: 1379 self.factory._log("skipped delayed write, since connection is closed") 1380 # we need to reenter the reactor to make the latter 1381 # reenter the OS network stack, so that octets 1382 # can get on the wire. Note: this is a "heuristic", 1383 # since there is no (easy) way to really force out 1384 # octets from the OS network stack to wire. 1385 self.factory._callLater(WebSocketProtocol._QUEUED_WRITE_DELAY, self._send) 1386 else: 1387 self.triggered = False 1388 1389 1390 def sendData(self, data, sync = False, chopsize = None): 1391 """ 1392 Wrapper for self.transport.write which allows to give a chopsize. 1393 When asked to chop up writing to TCP stream, we write only chopsize octets 1394 and then give up control to select() in underlying reactor so that bytes 1395 get onto wire immediately. Note that this is different from and unrelated 1396 to WebSocket data message fragmentation. Note that this is also different 1397 from the TcpNoDelay option which can be set on the socket. 1398 1399 Modes: Hybi, Hixie 1400 """ 1401 if chopsize and chopsize > 0: 1402 i = 0 1403 n = len(data) 1404 done = False 1405 while not done: 1406 j = i + chopsize 1407 if j >= n: 1408 done = True 1409 j = n 1410 self.send_queue.append((data[i:j], True)) 1411 i += chopsize 1412 self._trigger() 1413 else: 1414 if sync or len(self.send_queue) > 0: 1415 self.send_queue.append((data, sync)) 1416 self._trigger() 1417 else: 1418 self.transport.write(data) 1419 1420 if self.state == WebSocketProtocol.STATE_OPEN: 1421 self.trafficStats.outgoingOctetsWireLevel += len(data) 1422 elif self.state == WebSocketProtocol.STATE_CONNECTING or self.state == WebSocketProtocol.STATE_PROXY_CONNECTING: 1423 self.trafficStats.preopenOutgoingOctetsWireLevel += len(data) 1424 1425 if self.logOctets: 1426 self.logTxOctets(data, False) 1427 1428 1429 def sendPreparedMessage(self, preparedMsg): 1430 """ 1431 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPreparedMessage` 1432 """ 1433 if self.websocket_version != 0: 1434 if self._perMessageCompress is None or preparedMsg.doNotCompress: 1435 self.sendData(preparedMsg.payloadHybi) 1436 else: 1437 self.sendMessage(preparedMsg.payload, preparedMsg.binary) 1438 else: 1439 self.sendData(preparedMsg.payloadHixie) 1440 1441 1442 def processData(self): 1443 """ 1444 After WebSocket handshake has been completed, this procedure will do all 1445 subsequent processing of incoming bytes. 1446 1447 Modes: Hybi, Hixie 1448 """ 1449 if self.websocket_version == 0: 1450 return self.processDataHixie76() 1451 else: 1452 return self.processDataHybi() 1453 1454 1455 def processDataHixie76(self): 1456 """ 1457 Hixie-76 incoming data processing. 1458 1459 Modes: Hixie 1460 """ 1461 buffered_len = len(self.data) 1462 1463 ## outside a message, that is we are awaiting data which starts a new message 1464 ## 1465 if not self.inside_message: 1466 if buffered_len >= 2: 1467 1468 ## new message 1469 ## 1470 if self.data[0] == b'\x00': 1471 1472 self.inside_message = True 1473 1474 if self.utf8validateIncoming: 1475 self.utf8validator.reset() 1476 self.utf8validateIncomingCurrentMessage = True 1477 self.utf8validateLast = (True, True, 0, 0) 1478 else: 1479 self.utf8validateIncomingCurrentMessage = False 1480 1481 self.data = self.data[1:] 1482 if self.trackedTimings: 1483 self.trackedTimings.track("onMessageBegin") 1484 self._onMessageBegin(False) 1485 1486 ## Hixie close from peer received 1487 ## 1488 elif self.data[0] == b'\xff' and self.data[1] == b'\x00': 1489 self.onCloseFrame(None, None) 1490 self.data = self.data[2:] 1491 # stop receiving/processing after having received close! 1492 return False 1493 1494 ## malformed data 1495 ## 1496 else: 1497 if self.protocolViolation("malformed data received"): 1498 return False 1499 else: 1500 ## need more data 1501 return False 1502 1503 end_index = self.data.find(b'\xff') 1504 if end_index > 0: 1505 payload = self.data[:end_index] 1506 self.data = self.data[end_index + 1:] 1507 else: 1508 payload = self.data 1509 self.data = b'' 1510 1511 ## incrementally validate UTF-8 payload 1512 ## 1513 if self.utf8validateIncomingCurrentMessage: 1514 self.utf8validateLast = self.utf8validator.validate(payload) 1515 if not self.utf8validateLast[0]: 1516 if self.invalidPayload("encountered invalid UTF-8 while processing text message at payload octet index %d" % self.utf8validateLast[3]): 1517 return False 1518 1519 self._onMessageFrameData(payload) 1520 1521 if end_index > 0: 1522 self.inside_message = False 1523 self._onMessageEnd() 1524 1525 return len(self.data) > 0 1526 1527 1528 def processDataHybi(self): 1529 """ 1530 RFC6455/Hybi-Drafts incoming data processing. 1531 1532 Modes: Hybi 1533 """ 1534 buffered_len = len(self.data) 1535 1536 ## outside a frame, that is we are awaiting data which starts a new frame 1537 ## 1538 if self.current_frame is None: 1539 1540 ## need minimum of 2 octets to for new frame 1541 ## 1542 if buffered_len >= 2: 1543 1544 ## FIN, RSV, OPCODE 1545 ## 1546 if six.PY3: 1547 b = self.data[0] 1548 else: 1549 b = ord(self.data[0]) 1550 frame_fin = (b & 0x80) != 0 1551 frame_rsv = (b & 0x70) >> 4 1552 frame_opcode = b & 0x0f 1553 1554 ## MASK, PAYLOAD LEN 1 1555 ## 1556 if six.PY3: 1557 b = self.data[1] 1558 else: 1559 b = ord(self.data[1]) 1560 frame_masked = (b & 0x80) != 0 1561 frame_payload_len1 = b & 0x7f 1562 1563 ## MUST be 0 when no extension defining 1564 ## the semantics of RSV has been negotiated 1565 ## 1566 if frame_rsv != 0: 1567 if self._perMessageCompress is not None and frame_rsv == 4: 1568 pass 1569 else: 1570 if self.protocolViolation("RSV = %d and no extension negotiated" % frame_rsv): 1571 return False 1572 1573 ## all client-to-server frames MUST be masked 1574 ## 1575 if self.factory.isServer and self.requireMaskedClientFrames and not frame_masked: 1576 if self.protocolViolation("unmasked client-to-server frame"): 1577 return False 1578 1579 ## all server-to-client frames MUST NOT be masked 1580 ## 1581 if not self.factory.isServer and not self.acceptMaskedServerFrames and frame_masked: 1582 if self.protocolViolation("masked server-to-client frame"): 1583 return False 1584 1585 ## check frame 1586 ## 1587 if frame_opcode > 7: # control frame (have MSB in opcode set) 1588 1589 ## control frames MUST NOT be fragmented 1590 ## 1591 if not frame_fin: 1592 if self.protocolViolation("fragmented control frame"): 1593 return False 1594 1595 ## control frames MUST have payload 125 octets or less 1596 ## 1597 if frame_payload_len1 > 125: 1598 if self.protocolViolation("control frame with payload length > 125 octets"): 1599 return False 1600 1601 ## check for reserved control frame opcodes 1602 ## 1603 if frame_opcode not in [8, 9, 10]: 1604 if self.protocolViolation("control frame using reserved opcode %d" % frame_opcode): 1605 return False 1606 1607 ## close frame : if there is a body, the first two bytes of the body MUST be a 2-byte 1608 ## unsigned integer (in network byte order) representing a status code 1609 ## 1610 if frame_opcode == 8 and frame_payload_len1 == 1: 1611 if self.protocolViolation("received close control frame with payload len 1"): 1612 return False 1613 1614 ## control frames MUST NOT be compressed 1615 ## 1616 if self._perMessageCompress is not None and frame_rsv == 4: 1617 if self.protocolViolation("received compressed control frame [%s]" % self._perMessageCompress.EXTENSION_NAME): 1618 return False 1619 1620 else: # data frame 1621 1622 ## check for reserved data frame opcodes 1623 ## 1624 if frame_opcode not in [0, 1, 2]: 1625 if self.protocolViolation("data frame using reserved opcode %d" % frame_opcode): 1626 return False 1627 1628 ## check opcode vs message fragmentation state 1/2 1629 ## 1630 if not self.inside_message and frame_opcode == 0: 1631 if self.protocolViolation("received continuation data frame outside fragmented message"): 1632 return False 1633 1634 ## check opcode vs message fragmentation state 2/2 1635 ## 1636 if self.inside_message and frame_opcode != 0: 1637 if self.protocolViolation("received non-continuation data frame while inside fragmented message"): 1638 return False 1639 1640 ## continuation data frames MUST NOT have the compressed bit set 1641 ## 1642 if self._perMessageCompress is not None and frame_rsv == 4 and self.inside_message: 1643 if self.protocolViolation("received continution data frame with compress bit set [%s]" % self._perMessageCompress.EXTENSION_NAME): 1644 return False 1645 1646 ## compute complete header length 1647 ## 1648 if frame_masked: 1649 mask_len = 4 1650 else: 1651 mask_len = 0 1652 1653 if frame_payload_len1 < 126: 1654 frame_header_len = 2 + mask_len 1655 elif frame_payload_len1 == 126: 1656 frame_header_len = 2 + 2 + mask_len 1657 elif frame_payload_len1 == 127: 1658 frame_header_len = 2 + 8 + mask_len 1659 else: 1660 raise Exception("logic error") 1661 1662 ## only proceed when we have enough data buffered for complete 1663 ## frame header (which includes extended payload len + mask) 1664 ## 1665 if buffered_len >= frame_header_len: 1666 1667 ## minimum frame header length (already consumed) 1668 ## 1669 i = 2 1670 1671 ## extract extended payload length 1672 ## 1673 if frame_payload_len1 == 126: 1674 frame_payload_len = struct.unpack("!H", self.data[i:i+2])[0] 1675 if frame_payload_len < 126: 1676 if self.protocolViolation("invalid data frame length (not using minimal length encoding)"): 1677 return False 1678 i += 2 1679 elif frame_payload_len1 == 127: 1680 frame_payload_len = struct.unpack("!Q", self.data[i:i+8])[0] 1681 if frame_payload_len > 0x7FFFFFFFFFFFFFFF: # 2**63 1682 if self.protocolViolation("invalid data frame length (>2^63)"): 1683 return False 1684 if frame_payload_len < 65536: 1685 if self.protocolViolation("invalid data frame length (not using minimal length encoding)"): 1686 return False 1687 i += 8 1688 else: 1689 frame_payload_len = frame_payload_len1 1690 1691 ## when payload is masked, extract frame mask 1692 ## 1693 frame_mask = None 1694 if frame_masked: 1695 frame_mask = self.data[i:i+4] 1696 i += 4 1697 1698 if frame_masked and frame_payload_len > 0 and self.applyMask: 1699 self.current_frame_masker = createXorMasker(frame_mask, frame_payload_len) 1700 else: 1701 self.current_frame_masker = XorMaskerNull() 1702 1703 1704 ## remember rest (payload of current frame after header and everything thereafter) 1705 ## 1706 self.data = self.data[i:] 1707 1708 ## ok, got complete frame header 1709 ## 1710 self.current_frame = FrameHeader(frame_opcode, 1711 frame_fin, 1712 frame_rsv, 1713 frame_payload_len, 1714 frame_mask) 1715 1716 ## process begin on new frame 1717 ## 1718 self.onFrameBegin() 1719 1720 ## reprocess when frame has no payload or and buffered data left 1721 ## 1722 return frame_payload_len == 0 or len(self.data) > 0 1723 1724 else: 1725 return False # need more data 1726 else: 1727 return False # need more data 1728 1729 ## inside a started frame 1730 ## 1731 else: 1732 1733 ## cut out rest of frame payload 1734 ## 1735 rest = self.current_frame.length - self.current_frame_masker.pointer() 1736 if buffered_len >= rest: 1737 data = self.data[:rest] 1738 length = rest 1739 self.data = self.data[rest:] 1740 else: 1741 data = self.data 1742 length = buffered_len 1743 self.data = b'' 1744 1745 if length > 0: 1746 ## unmask payload 1747 ## 1748 payload = self.current_frame_masker.process(data) 1749 else: 1750 ## we also process empty payloads, since we need to fire 1751 ## our hooks (at least for streaming processing, this is 1752 ## necessary for correct protocol state transitioning) 1753 ## 1754 payload = b'' 1755 1756 ## process frame data 1757 ## 1758 fr = self.onFrameData(payload) 1759 if fr == False: 1760 return False 1761 1762 ## fire frame end handler when frame payload is complete 1763 ## 1764 if self.current_frame_masker.pointer() == self.current_frame.length: 1765 fr = self.onFrameEnd() 1766 if fr == False: 1767 return False 1768 1769 ## reprocess when no error occurred and buffered data left 1770 ## 1771 return len(self.data) > 0 1772 1773 1774 def onFrameBegin(self): 1775 """ 1776 Begin of receive new frame. 1777 1778 Modes: Hybi 1779 """ 1780 if self.current_frame.opcode > 7: 1781 self.control_frame_data = [] 1782 else: 1783 ## new message started 1784 ## 1785 if not self.inside_message: 1786 1787 self.inside_message = True 1788 1789 ## setup decompressor 1790 ## 1791 if self._perMessageCompress is not None and self.current_frame.rsv == 4: 1792 self._isMessageCompressed = True 1793 self._perMessageCompress.startDecompressMessage() 1794 else: 1795 self._isMessageCompressed = False 1796 1797 ## setup UTF8 validator 1798 ## 1799 if self.current_frame.opcode == WebSocketProtocol.MESSAGE_TYPE_TEXT and self.utf8validateIncoming: 1800 self.utf8validator.reset() 1801 self.utf8validateIncomingCurrentMessage = True 1802 self.utf8validateLast = (True, True, 0, 0) 1803 else: 1804 self.utf8validateIncomingCurrentMessage = False 1805 1806 ## track timings 1807 ## 1808 if self.trackedTimings: 1809 self.trackedTimings.track("onMessageBegin") 1810 1811 ## fire onMessageBegin 1812 ## 1813 self._onMessageBegin(self.current_frame.opcode == WebSocketProtocol.MESSAGE_TYPE_BINARY) 1814 1815 self._onMessageFrameBegin(self.current_frame.length) 1816 1817 1818 def onFrameData(self, payload): 1819 """ 1820 New data received within frame. 1821 1822 Modes: Hybi 1823 """ 1824 if self.current_frame.opcode > 7: 1825 self.control_frame_data.append(payload) 1826 else: 1827 ## decompress frame payload 1828 ## 1829 if self._isMessageCompressed: 1830 compressedLen = len(payload) 1831 if self.debug: 1832 self.factory._log("RX compressed [%d]: %s" % (compressedLen, binascii.b2a_hex(payload))) 1833 1834 payload = self._perMessageCompress.decompressMessageData(payload) 1835 uncompressedLen = len(payload) 1836 else: 1837 l = len(payload) 1838 compressedLen = l 1839 uncompressedLen = l 1840 1841 if self.state == WebSocketProtocol.STATE_OPEN: 1842 self.trafficStats.incomingOctetsWebSocketLevel += compressedLen 1843 self.trafficStats.incomingOctetsAppLevel += uncompressedLen 1844 1845 ## incrementally validate UTF-8 payload 1846 ## 1847 if self.utf8validateIncomingCurrentMessage: 1848 self.utf8validateLast = self.utf8validator.validate(payload) 1849 if not self.utf8validateLast[0]: 1850 if self.invalidPayload("encountered invalid UTF-8 while processing text message at payload octet index %d" % self.utf8validateLast[3]): 1851 return False 1852 1853 self._onMessageFrameData(payload) 1854 1855 1856 def onFrameEnd(self): 1857 """ 1858 End of frame received. 1859 1860 Modes: Hybi 1861 """ 1862 if self.current_frame.opcode > 7: 1863 if self.logFrames: 1864 self.logRxFrame(self.current_frame, self.control_frame_data) 1865 self.processControlFrame() 1866 else: 1867 if self.state == WebSocketProtocol.STATE_OPEN: 1868 self.trafficStats.incomingWebSocketFrames += 1 1869 if self.logFrames: 1870 self.logRxFrame(self.current_frame, self.frame_data) 1871 1872 self._onMessageFrameEnd() 1873 1874 if self.current_frame.fin: 1875 1876 ## handle end of compressed message 1877 ## 1878 if self._isMessageCompressed: 1879 self._perMessageCompress.endDecompressMessage() 1880 1881 ## verify UTF8 has actually ended 1882 ## 1883 if self.utf8validateIncomingCurrentMessage: 1884 if not self.utf8validateLast[1]: 1885 if self.invalidPayload("UTF-8 text message payload ended within Unicode code point at payload octet index %d" % self.utf8validateLast[3]): 1886 return False 1887 1888 #if self.debug: 1889 # self.factory._log("Traffic statistics:\n" + str(self.trafficStats)) 1890 1891 if self.state == WebSocketProtocol.STATE_OPEN: 1892 self.trafficStats.incomingWebSocketMessages += 1 1893 1894 self._onMessageEnd() 1895 self.inside_message = False 1896 1897 self.current_frame = None 1898 1899 1900 def processControlFrame(self): 1901 """ 1902 Process a completely received control frame. 1903 1904 Modes: Hybi 1905 """ 1906 1907 payload = b''.join(self.control_frame_data) 1908 self.control_frame_data = None 1909 1910 ## CLOSE frame 1911 ## 1912 if self.current_frame.opcode == 8: 1913 1914 code = None 1915 reasonRaw = None 1916 ll = len(payload) 1917 if ll > 1: 1918 code = struct.unpack("!H", payload[0:2])[0] 1919 if ll > 2: 1920 reasonRaw = payload[2:] 1921 1922 if self.onCloseFrame(code, reasonRaw): 1923 return False 1924 1925 ## PING frame 1926 ## 1927 elif self.current_frame.opcode == 9: 1928 self._onPing(payload) 1929 1930 ## PONG frame 1931 ## 1932 elif self.current_frame.opcode == 10: 1933 self._onPong(payload) 1934 1935 else: 1936 ## we might arrive here, when protocolViolation 1937 ## wants us to continue anyway 1938 pass 1939 1940 return True 1941 1942 1943 def sendFrame(self, 1944 opcode, 1945 payload = b'', 1946 fin = True, 1947 rsv = 0, 1948 mask = None, 1949 payload_len = None, 1950 chopsize = None, 1951 sync = False): 1952 """ 1953 Send out frame. Normally only used internally via sendMessage(), sendPing(), sendPong() and sendClose(). 1954 1955 This method deliberately allows to send invalid frames (that is frames invalid 1956 per-se, or frames invalid because of protocol state). Other than in fuzzing servers, 1957 calling methods will ensure that no invalid frames are sent. 1958 1959 In addition, this method supports explicit specification of payload length. 1960 When payload_len is given, it will always write that many octets to the stream. 1961 It'll wrap within payload, resending parts of that when more octets were requested 1962 The use case is again for fuzzing server which want to sent increasing amounts 1963 of payload data to peers without having to construct potentially large messages 1964 themselves. 1965 1966 Modes: Hybi 1967 """ 1968 if self.websocket_version == 0: 1969 raise Exception("function not supported in Hixie-76 mode") 1970 1971 if payload_len is not None: 1972 if len(payload) < 1: 1973 raise Exception("cannot construct repeated payload with length %d from payload of length %d" % (payload_len, len(payload))) 1974 l = payload_len 1975 pl = b''.join([payload for _ in range(payload_len / len(payload))]) + payload[:payload_len % len(payload)] 1976 else: 1977 l = len(payload) 1978 pl = payload 1979 1980 ## first byte 1981 ## 1982 b0 = 0 1983 if fin: 1984 b0 |= (1 << 7) 1985 b0 |= (rsv % 8) << 4 1986 b0 |= opcode % 128 1987 1988 ## second byte, payload len bytes and mask 1989 ## 1990 b1 = 0 1991 if mask or (not self.factory.isServer and self.maskClientFrames) or (self.factory.isServer and self.maskServerFrames): 1992 b1 |= 1 << 7 1993 if not mask: 1994 mask = struct.pack("!I", random.getrandbits(32)) 1995 mv = mask 1996 else: 1997 mv = b'' 1998 1999 ## mask frame payload 2000 ## 2001 if l > 0 and self.applyMask: 2002 masker = createXorMasker(mask, l) 2003 plm = masker.process(pl) 2004 else: 2005 plm = pl 2006 2007 else: 2008 mv = b'' 2009 plm = pl 2010 2011 el = b'' 2012 if l <= 125: 2013 b1 |= l 2014 elif l <= 0xFFFF: 2015 b1 |= 126 2016 el = struct.pack("!H", l) 2017 elif l <= 0x7FFFFFFFFFFFFFFF: 2018 b1 |= 127 2019 el = struct.pack("!Q", l) 2020 else: 2021 raise Exception("invalid payload length") 2022 2023 if six.PY3: 2024 raw = b''.join([b0.to_bytes(1, 'big'), b1.to_bytes(1, 'big'), el, mv, plm]) 2025 else: 2026 raw = b''.join([chr(b0), chr(b1), el, mv, plm]) 2027 2028 if opcode in [0, 1, 2]: 2029 self.trafficStats.outgoingWebSocketFrames += 1 2030 2031 if self.logFrames: 2032 frameHeader = FrameHeader(opcode, fin, rsv, l, mask) 2033 self.logTxFrame(frameHeader, payload, payload_len, chopsize, sync) 2034 2035 ## send frame octets 2036 ## 2037 self.sendData(raw, sync, chopsize) 2038 2039 2040 def sendPing(self, payload = None): 2041 """ 2042 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPing` 2043 """ 2044 if self.websocket_version == 0: 2045 raise Exception("function not supported in Hixie-76 mode") 2046 if self.state != WebSocketProtocol.STATE_OPEN: 2047 return 2048 if payload: 2049 l = len(payload) 2050 if l > 125: 2051 raise Exception("invalid payload for PING (payload length must be <= 125, was %d)" % l) 2052 self.sendFrame(opcode = 9, payload = payload) 2053 else: 2054 self.sendFrame(opcode = 9) 2055 2056 2057 def sendPong(self, payload = None): 2058 """ 2059 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendPong` 2060 """ 2061 if self.websocket_version == 0: 2062 raise Exception("function not supported in Hixie-76 mode") 2063 if self.state != WebSocketProtocol.STATE_OPEN: 2064 return 2065 if payload: 2066 l = len(payload) 2067 if l > 125: 2068 raise Exception("invalid payload for PONG (payload length must be <= 125, was %d)" % l) 2069 self.sendFrame(opcode = 10, payload = payload) 2070 else: 2071 self.sendFrame(opcode = 10) 2072 2073 2074 def sendCloseFrame(self, code = None, reasonUtf8 = None, isReply = False): 2075 """ 2076 Send a close frame and update protocol state. Note, that this is 2077 an internal method which deliberately allows not send close 2078 frame with invalid payload. 2079 2080 Modes: Hybi, Hixie 2081 2082 Notes: 2083 - For Hixie mode, this method is slightly misnamed for historic reasons. 2084 - For Hixie mode, code and reasonUtf8 will be silently ignored. 2085 """ 2086 if self.state == WebSocketProtocol.STATE_CLOSING: 2087 if self.debugCodePaths: 2088 self.factory._log("ignoring sendCloseFrame since connection is closing") 2089 2090 elif self.state == WebSocketProtocol.STATE_CLOSED: 2091 if self.debugCodePaths: 2092 self.factory._log("ignoring sendCloseFrame since connection already closed") 2093 2094 elif self.state in [WebSocketProtocol.STATE_PROXY_CONNECTING, WebSocketProtocol.STATE_CONNECTING]: 2095 raise Exception("cannot close a connection not yet connected") 2096 2097 elif self.state == WebSocketProtocol.STATE_OPEN: 2098 2099 if self.websocket_version == 0: 2100 self.sendData("\xff\x00") 2101 else: 2102 ## construct Hybi close frame payload and send frame 2103 payload = b'' 2104 if code is not None: 2105 payload += struct.pack("!H", code) 2106 if reasonUtf8 is not None: 2107 payload += reasonUtf8 2108 self.sendFrame(opcode = 8, payload = payload) 2109 2110 ## update state 2111 self.state = WebSocketProtocol.STATE_CLOSING 2112 self.closedByMe = not isReply 2113 2114 ## remember payload of close frame we sent 2115 self.localCloseCode = code 2116 self.localCloseReason = reasonUtf8 2117 2118 ## drop connection when timeout on receiving close handshake reply 2119 if self.closedByMe and self.closeHandshakeTimeout > 0: 2120 self.closeHandshakeTimeoutCall = self.factory._callLater(self.closeHandshakeTimeout, self.onCloseHandshakeTimeout) 2121 2122 else: 2123 raise Exception("logic error") 2124 2125 2126 def sendClose(self, code = None, reason = None): 2127 """ 2128 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendClose` 2129 """ 2130 if code is not None: 2131 if type(code) != int: 2132 raise Exception("invalid type %s for close code" % type(code)) 2133 if code != 1000 and not (code >= 3000 and code <= 4999): 2134 raise Exception("invalid close code %d" % code) 2135 if reason is not None: 2136 if code is None: 2137 raise Exception("close reason without close code") 2138 if type(reason) != six.text_type: 2139 raise Exception("invalid type %s for close reason" % type(reason)) 2140 reasonUtf8 = reason.encode("utf8") 2141 if len(reasonUtf8) + 2 > 125: 2142 raise Exception("close reason too long (%d)" % len(reasonUtf8)) 2143 else: 2144 reasonUtf8 = None 2145 self.sendCloseFrame(code = code, reasonUtf8 = reasonUtf8, isReply = False) 2146 2147 2148 def beginMessage(self, isBinary = False, doNotCompress = False): 2149 """ 2150 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.beginMessage` 2151 """ 2152 if self.state != WebSocketProtocol.STATE_OPEN: 2153 return 2154 2155 ## check if sending state is valid for this method 2156 ## 2157 if self.send_state != WebSocketProtocol.SEND_STATE_GROUND: 2158 raise Exception("WebSocketProtocol.beginMessage invalid in current sending state") 2159 2160 if self.websocket_version == 0: 2161 if isBinary: 2162 raise Exception("cannot send binary message in Hixie76 mode") 2163 2164 self.sendData('\x00') 2165 self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE 2166 else: 2167 self.send_message_opcode = WebSocketProtocol.MESSAGE_TYPE_BINARY if isBinary else WebSocketProtocol.MESSAGE_TYPE_TEXT 2168 self.send_state = WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN 2169 2170 ## setup compressor 2171 ## 2172 if self._perMessageCompress is not None and not doNotCompress: 2173 self.send_compressed = True 2174 self._perMessageCompress.startCompressMessage() 2175 else: 2176 self.send_compressed = False 2177 2178 self.trafficStats.outgoingWebSocketMessages += 1 2179 2180 2181 def beginMessageFrame(self, length): 2182 """ 2183 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.beginMessageFrame` 2184 """ 2185 if self.websocket_version == 0: 2186 raise Exception("function not supported in Hixie-76 mode") 2187 2188 if self.state != WebSocketProtocol.STATE_OPEN: 2189 return 2190 2191 ## check if sending state is valid for this method 2192 ## 2193 if self.send_state not in [WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN, WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE]: 2194 raise Exception("WebSocketProtocol.beginMessageFrame invalid in current sending state [%d]" % self.send_state) 2195 2196 if type(length) != int or length < 0 or length > 0x7FFFFFFFFFFFFFFF: # 2**63 2197 raise Exception("invalid value for message frame length") 2198 2199 self.send_message_frame_length = length 2200 2201 self.trafficStats.outgoingWebSocketFrames += 1 2202 2203 if (not self.factory.isServer and self.maskClientFrames) or (self.factory.isServer and self.maskServerFrames): 2204 ## automatic mask: 2205 ## - client-to-server masking (if not deactivated) 2206 ## - server-to-client masking (if activated) 2207 ## 2208 self.send_message_frame_mask = struct.pack("!I", random.getrandbits(32)) 2209 2210 else: 2211 ## no mask 2212 ## 2213 self.send_message_frame_mask = None 2214 2215 ## payload masker 2216 ## 2217 if self.send_message_frame_mask and length > 0 and self.applyMask: 2218 self.send_message_frame_masker = createXorMasker(self.send_message_frame_mask, length) 2219 else: 2220 self.send_message_frame_masker = XorMaskerNull() 2221 2222 ## first byte 2223 ## 2224 # FIN = false .. since with streaming, we don't know when message ends 2225 b0 = 0 2226 if self.send_state == WebSocketProtocol.SEND_STATE_MESSAGE_BEGIN: 2227 2228 b0 |= self.send_message_opcode % 128 2229 2230 if self.send_compressed: 2231 b0 |= (4 % 8) << 4 2232 2233 self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE 2234 else: 2235 pass # message continuation frame 2236 2237 ## second byte, payload len bytes and mask 2238 ## 2239 b1 = 0 2240 if self.send_message_frame_mask: 2241 b1 |= 1 << 7 2242 mv = self.send_message_frame_mask 2243 else: 2244 mv = b'' 2245 2246 el = b'' 2247 if length <= 125: 2248 b1 |= length 2249 elif length <= 0xFFFF: 2250 b1 |= 126 2251 el = struct.pack("!H", length) 2252 elif length <= 0x7FFFFFFFFFFFFFFF: 2253 b1 |= 127 2254 el = struct.pack("!Q", length) 2255 else: 2256 raise Exception("invalid payload length") 2257 2258 ## write message frame header 2259 ## 2260 if six.PY3: 2261 header = b''.join([b0.to_bytes(1, 'big'), b1.to_bytes(1, 'big'), el, mv]) 2262 else: 2263 header = b''.join([chr(b0), chr(b1), el, mv]) 2264 2265 self.sendData(header) 2266 2267 ## now we are inside message frame .. 2268 ## 2269 self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE_FRAME 2270 2271 2272 def sendMessageFrameData(self, payload, sync = False): 2273 """ 2274 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrameData` 2275 """ 2276 if self.state != WebSocketProtocol.STATE_OPEN: 2277 return 2278 2279 if not self.send_compressed: 2280 self.trafficStats.outgoingOctetsAppLevel += len(payload) 2281 self.trafficStats.outgoingOctetsWebSocketLevel += len(payload) 2282 2283 if self.websocket_version == 0: 2284 ## Hixie Mode 2285 ## 2286 if self.send_state != WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE: 2287 raise Exception("WebSocketProtocol.sendMessageFrameData invalid in current sending state") 2288 self.sendData(payload, sync = sync) 2289 return None 2290 2291 else: 2292 ## Hybi Mode 2293 ## 2294 if self.send_state != WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE_FRAME: 2295 raise Exception("WebSocketProtocol.sendMessageFrameData invalid in current sending state") 2296 2297 rl = len(payload) 2298 if self.send_message_frame_masker.pointer() + rl > self.send_message_frame_length: 2299 l = self.send_message_frame_length - self.send_message_frame_masker.pointer() 2300 rest = -(rl - l) 2301 pl = payload[:l] 2302 else: 2303 l = rl 2304 rest = self.send_message_frame_length - self.send_message_frame_masker.pointer() - l 2305 pl = payload 2306 2307 ## mask frame payload 2308 ## 2309 plm = self.send_message_frame_masker.process(pl) 2310 2311 ## send frame payload 2312 ## 2313 self.sendData(plm, sync = sync) 2314 2315 ## if we are done with frame, move back into "inside message" state 2316 ## 2317 if self.send_message_frame_masker.pointer() >= self.send_message_frame_length: 2318 self.send_state = WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE 2319 2320 ## when =0 : frame was completed exactly 2321 ## when >0 : frame is still uncomplete and that much amount is still left to complete the frame 2322 ## when <0 : frame was completed and there was this much unconsumed data in payload argument 2323 ## 2324 return rest 2325 2326 2327 def endMessage(self): 2328 """ 2329 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.endMessage` 2330 """ 2331 if self.state != WebSocketProtocol.STATE_OPEN: 2332 return 2333 2334 ## check if sending state is valid for this method 2335 ## 2336 #if self.send_state != WebSocketProtocol.SEND_STATE_INSIDE_MESSAGE: 2337 # raise Exception("WebSocketProtocol.endMessage invalid in current sending state [%d]" % self.send_state) 2338 2339 if self.websocket_version == 0: 2340 self.sendData('\x00') 2341 else: 2342 if self.send_compressed: 2343 payload = self._perMessageCompress.endCompressMessage() 2344 self.trafficStats.outgoingOctetsWebSocketLevel += len(payload) 2345 else: 2346 ## send continuation frame with empty payload and FIN set to end message 2347 payload = b'' 2348 self.sendFrame(opcode = 0, payload = payload, fin = True) 2349 2350 self.send_state = WebSocketProtocol.SEND_STATE_GROUND 2351 2352 2353 def sendMessageFrame(self, payload, sync = False): 2354 """ 2355 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessageFrame` 2356 """ 2357 if self.websocket_version == 0: 2358 raise Exception("function not supported in Hixie-76 mode") 2359 2360 if self.state != WebSocketProtocol.STATE_OPEN: 2361 return 2362 2363 if self.send_compressed: 2364 self.trafficStats.outgoingOctetsAppLevel += len(payload) 2365 payload = self._perMessageCompress.compressMessageData(payload) 2366 2367 self.beginMessageFrame(len(payload)) 2368 self.sendMessageFrameData(payload, sync) 2369 2370 2371 def sendMessage(self, 2372 payload, 2373 isBinary = False, 2374 fragmentSize = None, 2375 sync = False, 2376 doNotCompress = False): 2377 """ 2378 Implements :func:`autobahn.websocket.interfaces.IWebSocketChannel.sendMessage` 2379 """ 2380 assert(type(payload) == bytes) 2381 2382 if self.state != WebSocketProtocol.STATE_OPEN: 2383 return 2384 2385 if self.trackedTimings: 2386 self.trackedTimings.track("sendMessage") 2387 2388 if self.websocket_version == 0: 2389 if isBinary: 2390 raise Exception("cannot send binary message in Hixie76 mode") 2391 if fragmentSize: 2392 raise Exception("cannot fragment messages in Hixie76 mode") 2393 self.sendMessageHixie76(payload, sync) 2394 else: 2395 self.sendMessageHybi(payload, isBinary, fragmentSize, sync, doNotCompress) 2396 2397 2398 def sendMessageHixie76(self, payload, sync = False): 2399 """ 2400 Hixie76-Variant of sendMessage(). 2401 2402 Modes: Hixie 2403 """ 2404 self.sendData(b'\x00' + payload + b'\xff', sync = sync) 2405 2406 2407 def sendMessageHybi(self, 2408 payload, 2409 isBinary = False, 2410 fragmentSize = None, 2411 sync = False, 2412 doNotCompress = False): 2413 """ 2414 Hybi-Variant of sendMessage(). 2415 2416 Modes: Hybi 2417 """ 2418 ## (initial) frame opcode 2419 ## 2420 if isBinary: 2421 opcode = 2 2422 else: 2423 opcode = 1 2424 2425 self.trafficStats.outgoingWebSocketMessages += 1 2426 2427 ## setup compressor 2428 ## 2429 if self._perMessageCompress is not None and not doNotCompress: 2430 sendCompressed = True 2431 2432 self._perMessageCompress.startCompressMessage() 2433 2434 self.trafficStats.outgoingOctetsAppLevel += len(payload) 2435 2436 payload1 = self._perMessageCompress.compressMessageData(payload) 2437 payload2 = self._perMessageCompress.endCompressMessage() 2438 payload = b''.join([payload1, payload2]) 2439 2440 self.trafficStats.outgoingOctetsWebSocketLevel += len(payload) 2441 2442 else: 2443 sendCompressed = False 2444 l = len(payload) 2445 self.trafficStats.outgoingOctetsAppLevel += l 2446 self.trafficStats.outgoingOctetsWebSocketLevel += l 2447 2448 ## explicit fragmentSize arguments overrides autoFragmentSize setting 2449 ## 2450 if fragmentSize is not None: 2451 pfs = fragmentSize 2452 else: 2453 if self.autoFragmentSize > 0: 2454 pfs = self.autoFragmentSize 2455 else: 2456 pfs = None 2457 2458 ## send unfragmented 2459 ## 2460 if pfs is None or len(payload) <= pfs: 2461 self.sendFrame(opcode = opcode, payload = payload, sync = sync, rsv = 4 if sendCompressed else 0) 2462 2463 ## send data message in fragments 2464 ## 2465 else: 2466 if pfs < 1: 2467 raise Exception("payload fragment size must be at least 1 (was %d)" % pfs) 2468 n = len(payload) 2469 i = 0 2470 done = False 2471 first = True 2472 while not done: 2473 j = i + pfs 2474 if j > n: 2475 done = True 2476 j = n 2477 if first: 2478 self.sendFrame(opcode = opcode, payload = payload[i:j], fin = done, sync = sync, rsv = 4 if sendCompressed else 0) 2479 first = False 2480 else: 2481 self.sendFrame(opcode = 0, payload = payload[i:j], fin = done, sync = sync) 2482 i += pfs 2483 2484 #if self.debug: 2485 # self.factory._log("Traffic statistics:\n" + str(self.trafficStats)) 2486 2487 2488 def _parseExtensionsHeader(self, header, removeQuotes = True): 2489 """ 2490 Parse the Sec-WebSocket-Extensions header. 2491 """ 2492 extensions = [] 2493 exts = [str(x.strip()) for x in header.split(',')] 2494 for e in exts: 2495 if e != "": 2496 ext = [x.strip() for x in e.split(";")] 2497 if len(ext) > 0: 2498 extension = ext[0].lower() 2499 params = {} 2500 for p in ext[1:]: 2501 p = [x.strip() for x in p.split("=")] 2502 key = p[0].lower() 2503 if len(p) > 1: 2504 value = "=".join(p[1:]) 2505 if removeQuotes: 2506 if len(value) > 0 and value[0] == '"': 2507 value = value[1:] 2508 if len(value) > 0 and value[-1] == '"': 2509 value = value[:-1] 2510 else: 2511 value = True 2512 if not key in params: 2513 params[key] = [] 2514 params[key].append(value) 2515 extensions.append((extension, params)) 2516 else: 2517 pass # should not arrive here 2518 return extensions 2519 2520 2521 2522IWebSocketChannel.register(WebSocketProtocol) 2523IWebSocketChannelFrameApi.register(WebSocketProtocol) 2524IWebSocketChannelStreamingApi.register(WebSocketProtocol) 2525 2526 2527 2528class PreparedMessage: 2529 """ 2530 Encapsulates a prepared message to be sent later once or multiple 2531 times on one or more WebSocket connections. 2532 This can be used for optimizing Broadcast/PubSub. 2533 """ 2534 2535 def __init__(self, payload, isBinary, applyMask, doNotCompress): 2536 """ 2537 Ctor for a prepared message. 2538 2539 :param payload: The message payload. 2540 :type payload: str 2541 :param isBinary: Provide `True` for binary payload. 2542 :type isBinary: bool 2543 :param applyMask: Provide `True` if WebSocket message is to be masked (required for client to server WebSocket messages). 2544 :type applyMask: bool 2545 :param doNotCompress: Iff `True`, never compress this message. This only applies to 2546 Hybi-Mode and only when WebSocket compression has been negotiated on 2547 the WebSocket connection. Use when you know the payload 2548 incompressible (e.g. encrypted or already compressed). 2549 :type doNotCompress: bool 2550 """ 2551 if not doNotCompress: 2552 ## we need to store original payload for compressed WS 2553 ## connections (cannot compress/frame in advanced when 2554 ## compression is on, and context takeover is off) 2555 self.payload = payload 2556 self.binary = isBinary 2557 self.doNotCompress = doNotCompress 2558 2559 ## store pre-framed octets to be sent to Hixie-76 peers 2560 self._initHixie(payload, isBinary) 2561 2562 ## store pre-framed octets to be sent to Hybi peers 2563 self._initHybi(payload, isBinary, applyMask) 2564 2565 2566 def _initHixie(self, payload, binary): 2567 if binary: 2568 # silently filter out .. probably do something else: 2569 # base64? 2570 # dunno 2571 self.payloadHixie = '' 2572 else: 2573 self.payloadHixie = '\x00' + payload + '\xff' 2574 2575 2576 def _initHybi(self, payload, binary, masked): 2577 l = len(payload) 2578 2579 ## first byte 2580 ## 2581 b0 = ((1 << 7) | 2) if binary else ((1 << 7) | 1) 2582 2583 ## second byte, payload len bytes and mask 2584 ## 2585 if masked: 2586 b1 = 1 << 7 2587 mask = struct.pack("!I", random.getrandbits(32)) 2588 if l == 0: 2589 plm = payload 2590 else: 2591 plm = createXorMasker(mask, l).process(payload) 2592 else: 2593 b1 = 0 2594 mask = b'' 2595 plm = payload 2596 2597 ## payload extended length 2598 ## 2599 el = b'' 2600 if l <= 125: 2601 b1 |= l 2602 elif l <= 0xFFFF: 2603 b1 |= 126 2604 el = struct.pack("!H", l) 2605 elif l <= 0x7FFFFFFFFFFFFFFF: 2606 b1 |= 127 2607 el = struct.pack("!Q", l) 2608 else: 2609 raise Exception("invalid payload length") 2610 2611 ## raw WS message (single frame) 2612 ## 2613 if six.PY3: 2614 self.payloadHybi = b''.join([b0.to_bytes(1, 'big'), b1.to_bytes(1, 'big'), el, mask, plm]) 2615 else: 2616 self.payloadHybi = b''.join([chr(b0), chr(b1), el, mask, plm]) 2617 2618 2619 2620class WebSocketFactory: 2621 """ 2622 Mixin for 2623 :class:`autobahn.websocket.protocol.WebSocketClientFactory` and 2624 :class:`autobahn.websocket.protocol.WebSocketServerFactory`. 2625 """ 2626 2627 def prepareMessage(self, payload, isBinary = False, doNotCompress = False): 2628 """ 2629 Prepare a WebSocket message. This can be later sent on multiple 2630 instances of :class:`autobahn.websocket.WebSocketProtocol` using 2631 :meth:`autobahn.websocket.WebSocketProtocol.sendPreparedMessage`. 2632 2633 By doing so, you can avoid the (small) overhead of framing the 2634 *same* payload into WebSocket messages multiple times when that 2635 same payload is to be sent out on multiple connections. 2636 2637 :param payload: The message payload. 2638 :type payload: bytes 2639 :param isBinary: `True` iff payload is binary, else the payload must be UTF-8 encoded text. 2640 :type isBinary: bool 2641 :param doNotCompress: Iff `True`, never compress this message. This only applies to 2642 Hybi-Mode and only when WebSocket compression has been negotiated on 2643 the WebSocket connection. Use when you know the payload 2644 incompressible (e.g. encrypted or already compressed). 2645 :type doNotCompress: bool 2646 2647 :returns: obj -- An instance of :class:`autobahn.websocket.protocol.PreparedMessage`. 2648 """ 2649 applyMask = not self.isServer 2650 return PreparedMessage(payload, isBinary, applyMask, doNotCompress) 2651 2652 2653 2654class WebSocketServerProtocol(WebSocketProtocol): 2655 """ 2656 Protocol base class for WebSocket servers. 2657 """ 2658 2659 CONFIG_ATTRS = WebSocketProtocol.CONFIG_ATTRS_COMMON + WebSocketProtocol.CONFIG_ATTRS_SERVER 2660 2661 2662 def onConnect(self, request): 2663 """ 2664 Callback fired during WebSocket opening handshake when new WebSocket client 2665 connection is about to be established. 2666 2667 When you want to accept the connection, return the accepted protocol 2668 from list of WebSocket (sub)protocols provided by client or `None` to 2669 speak no specific one or when the client protocol list was empty. 2670 2671 You may also return a pair of `(protocol, headers)` to send additional 2672 HTTP headers, with `headers` being a dictionary of key-values. 2673 2674 Throw :class:`autobahn.websocket.http.HttpException` when you don't want 2675 to accept the WebSocket connection request. 2676 2677 :param request: WebSocket connection request information. 2678 :type request: instance of :class:`autobahn.websocket.protocol.ConnectionRequest` 2679 """ 2680 return None 2681 2682 2683 def _connectionMade(self): 2684 """ 2685 Called by network framework when new transport connection from client was 2686 accepted. Default implementation will prepare for initial WebSocket opening 2687 handshake. When overriding in derived class, make sure to call this base class 2688 implementation *before* your code. 2689 """ 2690 WebSocketProtocol._connectionMade(self) 2691 self.factory.countConnections += 1 2692 if self.debug: 2693 self.factory._log("connection accepted from peer %s" % self.peer) 2694 2695 2696 def _connectionLost(self, reason): 2697 """ 2698 Called by network framework when established transport connection from client 2699 was lost. Default implementation will tear down all state properly. 2700 When overriding in derived class, make sure to call this base class 2701 implementation *after* your code. 2702 """ 2703 WebSocketProtocol._connectionLost(self, reason) 2704 self.factory.countConnections -= 1 2705 if self.debug: 2706 self.factory._log("connection from %s lost" % self.peer) 2707 2708 2709 def processProxyConnect(self): 2710 raise Exception("Autobahn isn't a proxy server") 2711 2712 2713 def parseHixie76Key(self, key): 2714 """ 2715 Parse Hixie76 opening handshake key provided by client. 2716 """ 2717 return int(filter(lambda x: x.isdigit(), key)) / key.count(" ") 2718 2719 2720 def processHandshake(self): 2721 """ 2722 Process WebSocket opening handshake request from client. 2723 """ 2724 ## only proceed when we have fully received the HTTP request line and all headers 2725 ## 2726 end_of_header = self.data.find(b"\x0d\x0a\x0d\x0a") 2727 if end_of_header >= 0: 2728 2729 self.http_request_data = self.data[:end_of_header + 4] 2730 if self.debug: 2731 self.factory._log("received HTTP request:\n\n%s\n\n" % self.http_request_data) 2732 2733 ## extract HTTP status line and headers 2734 ## 2735 (self.http_status_line, self.http_headers, http_headers_cnt) = parseHttpHeader(self.http_request_data) 2736 2737 ## validate WebSocket opening handshake client request 2738 ## 2739 if self.debug: 2740 self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) 2741 self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers)) 2742 2743 ## HTTP Request line : METHOD, VERSION 2744 ## 2745 rl = self.http_status_line.split() 2746 if len(rl) != 3: 2747 return self.failHandshake("Bad HTTP request status line '%s'" % self.http_status_line) 2748 if rl[0].strip() != "GET": 2749 return self.failHandshake("HTTP method '%s' not allowed" % rl[0], http.METHOD_NOT_ALLOWED[0]) 2750 vs = rl[2].strip().split("/") 2751 if len(vs) != 2 or vs[0] != "HTTP" or vs[1] not in ["1.1"]: 2752 return self.failHandshake("Unsupported HTTP version '%s'" % rl[2], http.UNSUPPORTED_HTTP_VERSION[0]) 2753 2754 ## HTTP Request line : REQUEST-URI 2755 ## 2756 self.http_request_uri = rl[1].strip() 2757 try: 2758 (scheme, netloc, path, params, query, fragment) = urllib.parse.urlparse(self.http_request_uri) 2759 2760 ## FIXME: check that if absolute resource URI is given, 2761 ## the scheme/netloc matches the server 2762 if scheme != "" or netloc != "": 2763 pass 2764 2765 ## Fragment identifiers are meaningless in the context of WebSocket 2766 ## URIs, and MUST NOT be used on these URIs. 2767 if fragment != "": 2768 return self.failHandshake("HTTP requested resource contains a fragment identifier '%s'" % fragment) 2769 2770 ## resource path and query parameters .. this will get forwarded 2771 ## to onConnect() 2772 self.http_request_path = path 2773 self.http_request_params = urllib.parse.parse_qs(query) 2774 except: 2775 return self.failHandshake("Bad HTTP request resource - could not parse '%s'" % rl[1].strip()) 2776 2777 ## Host 2778 ## 2779 if not 'host' in self.http_headers: 2780 return self.failHandshake("HTTP Host header missing in opening handshake request") 2781 2782 if http_headers_cnt["host"] > 1: 2783 return self.failHandshake("HTTP Host header appears more than once in opening handshake request") 2784 2785 self.http_request_host = self.http_headers["host"].strip() 2786 2787 if self.http_request_host.find(":") >= 0: 2788 (h, p) = self.http_request_host.split(":") 2789 try: 2790 port = int(str(p.strip())) 2791 except: 2792 return self.failHandshake("invalid port '%s' in HTTP Host header '%s'" % (str(p.strip()), str(self.http_request_host))) 2793 2794 ## do port checking only if externalPort or URL was set 2795 if self.factory.externalPort: 2796 if port != self.factory.externalPort: 2797 return self.failHandshake("port %d in HTTP Host header '%s' does not match server listening port %s" % (port, str(self.http_request_host), self.factory.externalPort)) 2798 else: 2799 if self.debugCodePaths: 2800 self.factory._log("skipping openening handshake port checking - neither WS URL nor external port set") 2801 2802 self.http_request_host = h 2803 2804 else: 2805 ## do port checking only if externalPort or URL was set 2806 if self.factory.externalPort: 2807 if not ((self.factory.isSecure and self.factory.externalPort == 443) or (not self.factory.isSecure and self.factory.externalPort == 80)): 2808 return self.failHandshake("missing port in HTTP Host header '%s' and server runs on non-standard port %d (wss = %s)" % (str(self.http_request_host), self.factory.externalPort, self.factory.isSecure)) 2809 else: 2810 if self.debugCodePaths: 2811 self.factory._log("skipping openening handshake port checking - neither WS URL nor external port set") 2812 2813 ## Upgrade 2814 ## 2815 if not 'upgrade' in self.http_headers: 2816 ## When no WS upgrade, render HTML server status page 2817 ## 2818 if self.webStatus: 2819 if 'redirect' in self.http_request_params and len(self.http_request_params['redirect']) > 0: 2820 ## To specifiy an URL for redirection, encode the URL, i.e. from JavaScript: 2821 ## 2822 ## var url = encodeURIComponent("http://autobahn.ws/python"); 2823 ## 2824 ## and append the encoded string as a query parameter 'redirect' 2825 ## 2826 ## http://localhost:9000?redirect=http%3A%2F%2Fautobahn.ws%2Fpython 2827 ## https://localhost:9000?redirect=https%3A%2F%2Ftwitter.com%2F 2828 ## 2829 ## This will perform an immediate HTTP-303 redirection. If you provide 2830 ## an additional parameter 'after' (int >= 0), the redirection happens 2831 ## via Meta-Refresh in the rendered HTML status page, i.e. 2832 ## 2833 ## https://localhost:9000/?redirect=https%3A%2F%2Ftwitter.com%2F&after=3 2834 ## 2835 url = self.http_request_params['redirect'][0] 2836 if 'after' in self.http_request_params and len(self.http_request_params['after']) > 0: 2837 after = int(self.http_request_params['after'][0]) 2838 if self.debugCodePaths: 2839 self.factory._log("HTTP Upgrade header missing : render server status page and meta-refresh-redirecting to %s after %d seconds" % (url, after)) 2840 self.sendServerStatus(url, after) 2841 else: 2842 if self.debugCodePaths: 2843 self.factory._log("HTTP Upgrade header missing : 303-redirecting to %s" % url) 2844 self.sendRedirect(url) 2845 else: 2846 if self.debugCodePaths: 2847 self.factory._log("HTTP Upgrade header missing : render server status page") 2848 self.sendServerStatus() 2849 self.dropConnection(abort = False) 2850 return 2851 else: 2852 return self.failHandshake("HTTP Upgrade header missing", http.UPGRADE_REQUIRED[0]) 2853 upgradeWebSocket = False 2854 for u in self.http_headers["upgrade"].split(","): 2855 if u.strip().lower() == "websocket": 2856 upgradeWebSocket = True 2857 break 2858 if not upgradeWebSocket: 2859 return self.failHandshake("HTTP Upgrade headers do not include 'websocket' value (case-insensitive) : %s" % self.http_headers["upgrade"]) 2860 2861 ## Connection 2862 ## 2863 if not 'connection' in self.http_headers: 2864 return self.failHandshake("HTTP Connection header missing") 2865 connectionUpgrade = False 2866 for c in self.http_headers["connection"].split(","): 2867 if c.strip().lower() == "upgrade": 2868 connectionUpgrade = True 2869 break 2870 if not connectionUpgrade: 2871 return self.failHandshake("HTTP Connection headers do not include 'upgrade' value (case-insensitive) : %s" % self.http_headers["connection"]) 2872 2873 ## Sec-WebSocket-Version PLUS determine mode: Hybi or Hixie 2874 ## 2875 if not 'sec-websocket-version' in self.http_headers: 2876 if self.debugCodePaths: 2877 self.factory._log("Hixie76 protocol detected") 2878 if self.allowHixie76: 2879 version = 0 2880 else: 2881 return self.failHandshake("WebSocket connection denied - Hixie76 protocol mode disabled.") 2882 else: 2883 if self.debugCodePaths: 2884 self.factory._log("Hybi protocol detected") 2885 if http_headers_cnt["sec-websocket-version"] > 1: 2886 return self.failHandshake("HTTP Sec-WebSocket-Version header appears more than once in opening handshake request") 2887 try: 2888 version = int(self.http_headers["sec-websocket-version"]) 2889 except: 2890 return self.failHandshake("could not parse HTTP Sec-WebSocket-Version header '%s' in opening handshake request" % self.http_headers["sec-websocket-version"]) 2891 2892 if version not in self.versions: 2893 2894 ## respond with list of supported versions (descending order) 2895 ## 2896 sv = sorted(self.versions) 2897 sv.reverse() 2898 svs = ','.join([str(x) for x in sv]) 2899 return self.failHandshake("WebSocket version %d not supported (supported versions: %s)" % (version, svs), 2900 http.BAD_REQUEST[0], 2901 [("Sec-WebSocket-Version", svs)]) 2902 else: 2903 ## store the protocol version we are supposed to talk 2904 self.websocket_version = version 2905 2906 ## Sec-WebSocket-Protocol 2907 ## 2908 if 'sec-websocket-protocol' in self.http_headers: 2909 protocols = [str(x.strip()) for x in self.http_headers["sec-websocket-protocol"].split(",")] 2910 # check for duplicates in protocol header 2911 pp = {} 2912 for p in protocols: 2913 if p in pp: 2914 return self.failHandshake("duplicate protocol '%s' specified in HTTP Sec-WebSocket-Protocol header" % p) 2915 else: 2916 pp[p] = 1 2917 # ok, no duplicates, save list in order the client sent it 2918 self.websocket_protocols = protocols 2919 else: 2920 self.websocket_protocols = [] 2921 2922 ## Origin / Sec-WebSocket-Origin 2923 ## http://tools.ietf.org/html/draft-ietf-websec-origin-02 2924 ## 2925 if self.websocket_version < 13 and self.websocket_version != 0: 2926 # Hybi, but only < Hybi-13 2927 websocket_origin_header_key = 'sec-websocket-origin' 2928 else: 2929 # RFC6455, >= Hybi-13 and Hixie 2930 websocket_origin_header_key = "origin" 2931 2932 self.websocket_origin = None 2933 if websocket_origin_header_key in self.http_headers: 2934 if http_headers_cnt[websocket_origin_header_key] > 1: 2935 return self.failHandshake("HTTP Origin header appears more than once in opening handshake request") 2936 self.websocket_origin = self.http_headers[websocket_origin_header_key].strip() 2937 else: 2938 # non-browser clients are allowed to omit this header 2939 pass 2940 2941 ## Sec-WebSocket-Key (Hybi) or Sec-WebSocket-Key1/Sec-WebSocket-Key2 (Hixie-76) 2942 ## 2943 if self.websocket_version == 0: 2944 for kk in ['Sec-WebSocket-Key1', 'Sec-WebSocket-Key2']: 2945 k = kk.lower() 2946 if not k in self.http_headers: 2947 return self.failHandshake("HTTP %s header missing" % kk) 2948 if http_headers_cnt[k] > 1: 2949 return self.failHandshake("HTTP %s header appears more than once in opening handshake request" % kk) 2950 try: 2951 key1 = self.parseHixie76Key(self.http_headers["sec-websocket-key1"].strip()) 2952 key2 = self.parseHixie76Key(self.http_headers["sec-websocket-key2"].strip()) 2953 except: 2954 return self.failHandshake("could not parse Sec-WebSocket-Key1/2") 2955 else: 2956 if not 'sec-websocket-key' in self.http_headers: 2957 return self.failHandshake("HTTP Sec-WebSocket-Key header missing") 2958 if http_headers_cnt["sec-websocket-key"] > 1: 2959 return self.failHandshake("HTTP Sec-WebSocket-Key header appears more than once in opening handshake request") 2960 key = self.http_headers["sec-websocket-key"].strip() 2961 if len(key) != 24: # 16 bytes => (ceil(128/24)*24)/6 == 24 2962 return self.failHandshake("bad Sec-WebSocket-Key (length must be 24 ASCII chars) '%s'" % key) 2963 if key[-2:] != "==": # 24 - ceil(128/6) == 2 2964 return self.failHandshake("bad Sec-WebSocket-Key (invalid base64 encoding) '%s'" % key) 2965 for c in key[:-2]: 2966 if c not in "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+/": 2967 return self.failHandshake("bad character '%s' in Sec-WebSocket-Key (invalid base64 encoding) '%s'" % (c, key)) 2968 2969 ## Sec-WebSocket-Extensions 2970 ## 2971 self.websocket_extensions = [] 2972 if 'sec-websocket-extensions' in self.http_headers: 2973 2974 if self.websocket_version == 0: 2975 return self.failHandshake("HTTP Sec-WebSocket-Extensions header encountered for Hixie-76") 2976 else: 2977 if http_headers_cnt["sec-websocket-extensions"] > 1: 2978 return self.failHandshake("HTTP Sec-WebSocket-Extensions header appears more than once in opening handshake request") 2979 else: 2980 ## extensions requested/offered by client 2981 ## 2982 self.websocket_extensions = self._parseExtensionsHeader(self.http_headers["sec-websocket-extensions"]) 2983 2984 ## For Hixie-76, we need 8 octets of HTTP request body to complete HS! 2985 ## 2986 if self.websocket_version == 0: 2987 if len(self.data) < end_of_header + 4 + 8: 2988 return 2989 else: 2990 key3 = self.data[end_of_header + 4:end_of_header + 4 + 8] 2991 if self.debug: 2992 self.factory._log("received HTTP request body containing key3 for Hixie-76: %s" % key3) 2993 2994 ## Ok, got complete HS input, remember rest (if any) 2995 ## 2996 if self.websocket_version == 0: 2997 self.data = self.data[end_of_header + 4 + 8:] 2998 else: 2999 self.data = self.data[end_of_header + 4:] 3000 3001 ## store WS key 3002 ## 3003 if self.websocket_version == 0: 3004 self._wskey = (key1, key2, key3) 3005 else: 3006 self._wskey = key 3007 3008 ## WebSocket handshake validated => produce opening handshake response 3009 3010 ## Now fire onConnect() on derived class, to give that class a chance to accept or deny 3011 ## the connection. onConnect() may throw, in which case the connection is denied, or it 3012 ## may return a protocol from the protocols provided by client or None. 3013 ## 3014 request = ConnectionRequest(self.peer, 3015 self.http_headers, 3016 self.http_request_host, 3017 self.http_request_path, 3018 self.http_request_params, 3019 self.websocket_version, 3020 self.websocket_origin, 3021 self.websocket_protocols, 3022 self.websocket_extensions) 3023 self._onConnect(request) 3024 3025 3026 def succeedHandshake(self, res): 3027 """ 3028 Callback after onConnect() returns successfully. Generates the response for the handshake. 3029 """ 3030 protocol = None 3031 headers = {} 3032 if type(res) == tuple: 3033 if len(res) > 0: 3034 protocol = res[0] 3035 if len(res) > 1: 3036 headers = res[1] 3037 else: 3038 protocol = res 3039 3040 if protocol is not None and not (protocol in self.websocket_protocols): 3041 raise Exception("protocol accepted must be from the list client sent or None") 3042 3043 self.websocket_protocol_in_use = protocol 3044 3045 if self.websocket_version == 0: 3046 key1, key2, key3 = self._wskey 3047 else: 3048 key = self._wskey 3049 3050 3051 ## extensions effectively in use for this connection 3052 ## 3053 self.websocket_extensions_in_use = [] 3054 3055 extensionResponse = [] 3056 3057 ## gets filled with permessage-compress offers from the client 3058 ## 3059 pmceOffers = [] 3060 3061 ## handle WebSocket extensions 3062 ## 3063 for (extension, params) in self.websocket_extensions: 3064 3065 if self.debug: 3066 self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) 3067 3068 ## process permessage-compress extension 3069 ## 3070 if extension in PERMESSAGE_COMPRESSION_EXTENSION: 3071 3072 PMCE = PERMESSAGE_COMPRESSION_EXTENSION[extension] 3073 3074 try: 3075 offer = PMCE['Offer'].parse(params) 3076 pmceOffers.append(offer) 3077 except Exception as e: 3078 return self.failHandshake(str(e)) 3079 3080 else: 3081 if self.debug: 3082 self.factory._log("client requested '%s' extension we don't support or which is not activated" % extension) 3083 3084 ## handle permessage-compress offers by the client 3085 ## 3086 if len(pmceOffers) > 0: 3087 accept = self.perMessageCompressionAccept(pmceOffers) 3088 if accept is not None: 3089 PMCE = PERMESSAGE_COMPRESSION_EXTENSION[accept.EXTENSION_NAME] 3090 self._perMessageCompress = PMCE['PMCE'].createFromOfferAccept(self.factory.isServer, accept) 3091 self.websocket_extensions_in_use.append(self._perMessageCompress) 3092 extensionResponse.append(accept.getExtensionString()) 3093 else: 3094 if self.debug: 3095 self.factory._log("client request permessage-compress extension, but we did not accept any offer [%s]" % pmceOffers) 3096 3097 3098 ## build response to complete WebSocket handshake 3099 ## 3100 response = "HTTP/1.1 %d Switching Protocols\x0d\x0a" % http.SWITCHING_PROTOCOLS[0] 3101 3102 if self.factory.server is not None and self.factory.server != "": 3103 response += "Server: %s\x0d\x0a" % self.factory.server 3104 3105 response += "Upgrade: WebSocket\x0d\x0a" 3106 response += "Connection: Upgrade\x0d\x0a" 3107 3108 ## optional, user supplied additional HTTP headers 3109 ## 3110 ## headers from factory 3111 for uh in self.factory.headers.items(): 3112 response += "%s: %s\x0d\x0a" % (uh[0], uh[1]) 3113 ## headers from onConnect 3114 for uh in headers.items(): 3115 response += "%s: %s\x0d\x0a" % (uh[0], uh[1]) 3116 3117 if self.websocket_protocol_in_use is not None: 3118 response += "Sec-WebSocket-Protocol: %s\x0d\x0a" % str(self.websocket_protocol_in_use) 3119 3120 if self.websocket_version == 0: 3121 3122 if self.websocket_origin: 3123 ## browser client provide the header, and expect it to be echo'ed 3124 response += "Sec-WebSocket-Origin: %s\x0d\x0a" % str(self.websocket_origin) 3125 3126 if self.debugCodePaths: 3127 self.factory._log('factory isSecure = %s port = %s' % (self.factory.isSecure, self.factory.externalPort)) 3128 3129 if self.factory.externalPort and ((self.factory.isSecure and self.factory.externalPort != 443) or ((not self.factory.isSecure) and self.factory.externalPort != 80)): 3130 if self.debugCodePaths: 3131 self.factory._log('factory running on non-default port') 3132 response_port = ':' + str(self.factory.externalPort) 3133 else: 3134 if self.debugCodePaths: 3135 self.factory._log('factory running on default port') 3136 response_port = '' 3137 3138 ## FIXME: check this! But see below .. 3139 if False: 3140 response_host = str(self.factory.host) 3141 response_path = str(self.factory.path) 3142 else: 3143 response_host = str(self.http_request_host) 3144 response_path = str(self.http_request_uri) 3145 3146 location = "%s://%s%s%s" % ('wss' if self.factory.isSecure else 'ws', response_host, response_port, response_path) 3147 3148 # Safari is very picky about this one 3149 response += "Sec-WebSocket-Location: %s\x0d\x0a" % location 3150 3151 ## end of HTTP response headers 3152 response += "\x0d\x0a" 3153 3154 ## compute accept body 3155 ## 3156 accept_val = struct.pack(">II", key1, key2) + key3 3157 response_body = hashlib.md5(accept_val).digest() 3158 3159 else: 3160 ## compute Sec-WebSocket-Accept 3161 ## 3162 sha1 = hashlib.sha1() 3163 sha1.update(key.encode('utf8') + WebSocketProtocol._WS_MAGIC) 3164 sec_websocket_accept = base64.b64encode(sha1.digest()) 3165 3166 response += "Sec-WebSocket-Accept: %s\x0d\x0a" % sec_websocket_accept.decode() 3167 3168 ## agreed extensions 3169 ## 3170 if len(extensionResponse) > 0: 3171 response += "Sec-WebSocket-Extensions: %s\x0d\x0a" % ', '.join(extensionResponse) 3172 3173 ## end of HTTP response headers 3174 response += "\x0d\x0a" 3175 response_body = None 3176 3177 ## send out opening handshake response 3178 ## 3179 if self.debug: 3180 self.factory._log("sending HTTP response:\n\n%s" % response) 3181 self.sendData(response.encode('utf8')) 3182 3183 if response_body: 3184 if self.debug: 3185 self.factory._log("sending HTTP response body:\n\n%s" % binascii.b2a_hex(response_body)) 3186 self.sendData(response_body) 3187 3188 ## save response for testsuite 3189 ## 3190 self.http_response_data = response 3191 3192 ## opening handshake completed, move WebSocket connection into OPEN state 3193 ## 3194 self.state = WebSocketProtocol.STATE_OPEN 3195 3196 ## cancel any opening HS timer if present 3197 ## 3198 if self.openHandshakeTimeoutCall is not None: 3199 if self.debugCodePaths: 3200 self.factory._log("openHandshakeTimeoutCall.cancel") 3201 self.openHandshakeTimeoutCall.cancel() 3202 self.openHandshakeTimeoutCall = None 3203 3204 ## init state 3205 ## 3206 self.inside_message = False 3207 if self.websocket_version != 0: 3208 self.current_frame = None 3209 3210 ## fire handler on derived class 3211 ## 3212 if self.trackedTimings: 3213 self.trackedTimings.track("onOpen") 3214 self._onOpen() 3215 3216 ## process rest, if any 3217 ## 3218 if len(self.data) > 0: 3219 self.consumeData() 3220 3221 3222 def failHandshake(self, reason, code = http.BAD_REQUEST[0], responseHeaders = None): 3223 """ 3224 During opening handshake the client request was invalid, we send a HTTP 3225 error response and then drop the connection. 3226 """ 3227 if self.debug: 3228 self.factory._log("failing WebSocket opening handshake ('%s')" % reason) 3229 self.sendHttpErrorResponse(code, reason, responseHeaders) 3230 self.dropConnection(abort = False) 3231 3232 3233 def sendHttpErrorResponse(self, code, reason, responseHeaders = None): 3234 """ 3235 Send out HTTP error response. 3236 """ 3237 response = "HTTP/1.1 {0} {1}\x0d\x0a".format(code, reason) 3238 if responseHeaders: 3239 for h in responseHeaders: 3240 response += "{0}: {1}\x0d\x0a".format(h[0], h[1]) 3241 response += "\x0d\x0a" 3242 self.sendData(response.encode('utf8')) 3243 3244 3245 def sendHtml(self, html): 3246 """ 3247 Send HTML page HTTP response. 3248 """ 3249 responseBody = html.encode('utf8') 3250 response = "HTTP/1.1 %d %s\x0d\x0a" % (http.OK[0], http.OK[1]) 3251 if self.factory.server is not None and self.factory.server != "": 3252 response += "Server: %s\x0d\x0a" % self.factory.server 3253 response += "Content-Type: text/html; charset=UTF-8\x0d\x0a" 3254 response += "Content-Length: %d\x0d\x0a" % len(responseBody) 3255 response += "\x0d\x0a" 3256 self.sendData(response.encode('utf8')) 3257 self.sendData(responseBody) 3258 3259 3260 def sendRedirect(self, url): 3261 """ 3262 Send HTTP Redirect (303) response. 3263 """ 3264 response = "HTTP/1.1 %d\x0d\x0a" % http.SEE_OTHER[0] 3265 if self.factory.server is not None and self.factory.server != "": 3266 response += "Server: %s\x0d\x0a" % self.factory.server 3267 response += "Location: %s\x0d\x0a" % url 3268 response += "\x0d\x0a" 3269 self.sendData(response.encode('utf8')) 3270 3271 3272 def sendServerStatus(self, redirectUrl = None, redirectAfter = 0): 3273 """ 3274 Used to send out server status/version upon receiving a HTTP/GET without 3275 upgrade to WebSocket header (and option serverStatus is True). 3276 """ 3277 if redirectUrl: 3278 redirect = """<meta http-equiv="refresh" content="%d;URL='%s'">""" % (redirectAfter, redirectUrl) 3279 else: 3280 redirect = "" 3281 html = """ 3282<!DOCTYPE html> 3283<html> 3284 <head> 3285 %s 3286 <style> 3287 body { 3288 color: #fff; 3289 background-color: #027eae; 3290 font-family: "Segoe UI", "Lucida Grande", "Helvetica Neue", Helvetica, Arial, sans-serif; 3291 font-size: 16px; 3292 } 3293 3294 a, a:visited, a:hover { 3295 color: #fff; 3296 } 3297 </style> 3298 </head> 3299 <body> 3300 <h1>AutobahnPython %s</h1> 3301 <p> 3302 I am not Web server, but a <b>WebSocket Endpoint</b>. 3303 </p> 3304 <p> 3305 You can talk to me using the <a href="http://tools.ietf.org/html/rfc6455">WebSocket</a> protocol. 3306 </p> 3307 <p> 3308 For more information, please see: 3309 <ul> 3310 <li><a href="http://autobahn.ws/python">AutobahnPython</a></li> 3311 </ul> 3312 </p> 3313 </body> 3314</html> 3315""" % (redirect, __version__) 3316 self.sendHtml(html) 3317 3318 3319 3320class WebSocketServerFactory(WebSocketFactory): 3321 """ 3322 A protocol factory for WebSocket servers. 3323 """ 3324 3325 protocol = WebSocketServerProtocol 3326 """ 3327 The protocol to be spoken. Must be derived from :class:`autobahn.websocket.protocol.WebSocketServerProtocol`. 3328 """ 3329 3330 isServer = True 3331 """ 3332 Flag indicating if this factory is client- or server-side. 3333 """ 3334 3335 3336 def __init__(self, 3337 url = None, 3338 protocols = None, 3339 server = "AutobahnPython/%s" % __version__, 3340 headers = None, 3341 externalPort = None, 3342 debug = False, 3343 debugCodePaths = False): 3344 """ 3345 Create instance of WebSocket server factory. 3346 3347 :param url: The WebSocket URL this factory is working for, e.g. `ws://myhost.com/somepath`. 3348 For non-TCP transports like pipes or Unix domain sockets, provide `None`. 3349 This will use an implicit URL of `ws://localhost`. 3350 :type url: str 3351 :param protocols: List of subprotocols the server supports. The subprotocol used is the first from the list of subprotocols announced by the client that is contained in this list. 3352 :type protocols: list of strings 3353 :param server: Server as announced in HTTP response header during opening handshake or None (default: `AutobahnWebSocket/?.?.?`). 3354 :type server: str 3355 :param headers: An optional mapping of additional HTTP headers to send during the WebSocket opening handshake. 3356 :type headers: dict 3357 :param externalPort: Optionally, the external visible port this factory will be reachable under (i.e. when running behind a L2/L3 forwarding device). 3358 :type externalPort: int 3359 :param debug: Debug mode (default: `False`). 3360 :type debug: bool 3361 :param debugCodePaths: Debug code paths mode (default: `False`). 3362 :type debugCodePaths: bool 3363 """ 3364 self.debug = debug 3365 self.debugCodePaths = debugCodePaths 3366 3367 self.logOctets = debug 3368 self.logFrames = debug 3369 3370 self.trackTimings = False 3371 3372 ## seed RNG which is used for WS frame masks generation 3373 random.seed() 3374 3375 ## default WS session parameters 3376 ## 3377 self.setSessionParameters(url, protocols, server, headers, externalPort) 3378 3379 ## default WebSocket protocol options 3380 ## 3381 self.resetProtocolOptions() 3382 3383 ## number of currently connected clients 3384 ## 3385 self.countConnections = 0 3386 3387 3388 def setSessionParameters(self, 3389 url = None, 3390 protocols = None, 3391 server = None, 3392 headers = None, 3393 externalPort = None): 3394 """ 3395 Set WebSocket session parameters. 3396 3397 :param url: The WebSocket URL this factory is working for, e.g. `ws://myhost.com/somepath`. 3398 For non-TCP transports like pipes or Unix domain sockets, provide `None`. 3399 This will use an implicit URL of `ws://localhost`. 3400 :type url: str 3401 :param protocols: List of subprotocols the server supports. The subprotocol used is the first from the list of subprotocols announced by the client that is contained in this list. 3402 :type protocols: list of strings 3403 :param server: Server as announced in HTTP response header during opening handshake. 3404 :type server: str 3405 :param headers: An optional mapping of additional HTTP headers to send during the WebSocket opening handshake. 3406 :type headers: dict 3407 :param externalPort: Optionally, the external visible port this server will be reachable under (i.e. when running behind a L2/L3 forwarding device). 3408 :type externalPort: int 3409 """ 3410 ## parse WebSocket URI into components 3411 (isSecure, host, port, resource, path, params) = parseWsUrl(url or "ws://localhost") 3412 if len(params) > 0: 3413 raise Exception("query parameters specified for server WebSocket URL") 3414 self.url = url 3415 self.isSecure = isSecure 3416 self.host = host 3417 self.port = port 3418 self.resource = resource 3419 self.path = path 3420 self.params = params 3421 3422 self.protocols = protocols or [] 3423 self.server = server 3424 self.headers = headers or {} 3425 3426 if externalPort: 3427 self.externalPort = externalPort 3428 elif url: 3429 self.externalPort = self.port 3430 else: 3431 self.externalPort = None 3432 3433 3434 def resetProtocolOptions(self): 3435 """ 3436 Reset all WebSocket protocol options to defaults. 3437 """ 3438 self.versions = WebSocketProtocol.SUPPORTED_PROTOCOL_VERSIONS 3439 self.allowHixie76 = WebSocketProtocol.DEFAULT_ALLOW_HIXIE76 3440 self.webStatus = True 3441 self.utf8validateIncoming = True 3442 self.requireMaskedClientFrames = True 3443 self.maskServerFrames = False 3444 self.applyMask = True 3445 self.maxFramePayloadSize = 0 3446 self.maxMessagePayloadSize = 0 3447 self.autoFragmentSize = 0 3448 self.failByDrop = True 3449 self.echoCloseCodeReason = False 3450 self.openHandshakeTimeout = 5 3451 self.closeHandshakeTimeout = 1 3452 self.tcpNoDelay = True 3453 3454 ## permessage-XXX extension 3455 ## 3456 self.perMessageCompressionAccept = lambda _: None 3457 3458 3459 def setProtocolOptions(self, 3460 versions = None, 3461 allowHixie76 = None, 3462 webStatus = None, 3463 utf8validateIncoming = None, 3464 maskServerFrames = None, 3465 requireMaskedClientFrames = None, 3466 applyMask = None, 3467 maxFramePayloadSize = None, 3468 maxMessagePayloadSize = None, 3469 autoFragmentSize = None, 3470 failByDrop = None, 3471 echoCloseCodeReason = None, 3472 openHandshakeTimeout = None, 3473 closeHandshakeTimeout = None, 3474 tcpNoDelay = None, 3475 perMessageCompressionAccept = None): 3476 """ 3477 Set WebSocket protocol options used as defaults for new protocol instances. 3478 3479 :param versions: The WebSocket protocol versions accepted by the server (default: :func:`autobahn.websocket.protocol.WebSocketProtocol.SUPPORTED_PROTOCOL_VERSIONS`). 3480 :type versions: list of ints 3481 :param allowHixie76: Allow to speak Hixie76 protocol version. 3482 :type allowHixie76: bool 3483 :param webStatus: Return server status/version on HTTP/GET without WebSocket upgrade header (default: `True`). 3484 :type webStatus: bool 3485 :param utf8validateIncoming: Validate incoming UTF-8 in text message payloads (default: `True`). 3486 :type utf8validateIncoming: bool 3487 :param maskServerFrames: Mask server-to-client frames (default: `False`). 3488 :type maskServerFrames: bool 3489 :param requireMaskedClientFrames: Require client-to-server frames to be masked (default: `True`). 3490 :type requireMaskedClientFrames: bool 3491 :param applyMask: Actually apply mask to payload when mask it present. Applies for outgoing and incoming frames (default: `True`). 3492 :type applyMask: bool 3493 :param maxFramePayloadSize: Maximum frame payload size that will be accepted when receiving or `0` for unlimited (default: `0`). 3494 :type maxFramePayloadSize: int 3495 :param maxMessagePayloadSize: Maximum message payload size (after reassembly of fragmented messages) that will be accepted when receiving or `0` for unlimited (default: `0`). 3496 :type maxMessagePayloadSize: int 3497 :param autoFragmentSize: Automatic fragmentation of outgoing data messages (when using the message-based API) into frames with payload length `<=` this size or `0` for no auto-fragmentation (default: `0`). 3498 :type autoFragmentSize: int 3499 :param failByDrop: Fail connections by dropping the TCP connection without performing closing handshake (default: `True`). 3500 :type failbyDrop: bool 3501 :param echoCloseCodeReason: Iff true, when receiving a close, echo back close code/reason. Otherwise reply with `code == 1000, reason = ""` (default: `False`). 3502 :type echoCloseCodeReason: bool 3503 :param openHandshakeTimeout: Opening WebSocket handshake timeout, timeout in seconds or `0` to deactivate (default: `0`). 3504 :type openHandshakeTimeout: float 3505 :param closeHandshakeTimeout: When we expect to receive a closing handshake reply, timeout in seconds (default: `1`). 3506 :type closeHandshakeTimeout: float 3507 :param tcpNoDelay: TCP NODELAY ("Nagle") socket option (default: `True`). 3508 :type tcpNoDelay: bool 3509 :param perMessageCompressionAccept: Acceptor function for offers. 3510 :type perMessageCompressionAccept: callable 3511 """ 3512 if allowHixie76 is not None and allowHixie76 != self.allowHixie76: 3513 self.allowHixie76 = allowHixie76 3514 3515 if versions is not None: 3516 for v in versions: 3517 if v not in WebSocketProtocol.SUPPORTED_PROTOCOL_VERSIONS: 3518 raise Exception("invalid WebSocket protocol version %s (allowed values: %s)" % (v, str(WebSocketProtocol.SUPPORTED_PROTOCOL_VERSIONS))) 3519 if v == 0 and not self.allowHixie76: 3520 raise Exception("use of Hixie-76 requires allowHixie76 == True") 3521 if set(versions) != set(self.versions): 3522 self.versions = versions 3523 3524 if webStatus is not None and webStatus != self.webStatus: 3525 self.webStatus = webStatus 3526 3527 if utf8validateIncoming is not None and utf8validateIncoming != self.utf8validateIncoming: 3528 self.utf8validateIncoming = utf8validateIncoming 3529 3530 if requireMaskedClientFrames is not None and requireMaskedClientFrames != self.requireMaskedClientFrames: 3531 self.requireMaskedClientFrames = requireMaskedClientFrames 3532 3533 if maskServerFrames is not None and maskServerFrames != self.maskServerFrames: 3534 self.maskServerFrames = maskServerFrames 3535 3536 if applyMask is not None and applyMask != self.applyMask: 3537 self.applyMask = applyMask 3538 3539 if maxFramePayloadSize is not None and maxFramePayloadSize != self.maxFramePayloadSize: 3540 self.maxFramePayloadSize = maxFramePayloadSize 3541 3542 if maxMessagePayloadSize is not None and maxMessagePayloadSize != self.maxMessagePayloadSize: 3543 self.maxMessagePayloadSize = maxMessagePayloadSize 3544 3545 if autoFragmentSize is not None and autoFragmentSize != self.autoFragmentSize: 3546 self.autoFragmentSize = autoFragmentSize 3547 3548 if failByDrop is not None and failByDrop != self.failByDrop: 3549 self.failByDrop = failByDrop 3550 3551 if echoCloseCodeReason is not None and echoCloseCodeReason != self.echoCloseCodeReason: 3552 self.echoCloseCodeReason = echoCloseCodeReason 3553 3554 if openHandshakeTimeout is not None and openHandshakeTimeout != self.openHandshakeTimeout: 3555 self.openHandshakeTimeout = openHandshakeTimeout 3556 3557 if closeHandshakeTimeout is not None and closeHandshakeTimeout != self.closeHandshakeTimeout: 3558 self.closeHandshakeTimeout = closeHandshakeTimeout 3559 3560 if tcpNoDelay is not None and tcpNoDelay != self.tcpNoDelay: 3561 self.tcpNoDelay = tcpNoDelay 3562 3563 if perMessageCompressionAccept is not None and perMessageCompressionAccept != self.perMessageCompressionAccept: 3564 self.perMessageCompressionAccept = perMessageCompressionAccept 3565 3566 3567 def getConnectionCount(self): 3568 """ 3569 Get number of currently connected clients. 3570 3571 :returns: int -- Number of currently connected clients. 3572 """ 3573 return self.countConnections 3574 3575 3576 3577class WebSocketClientProtocol(WebSocketProtocol): 3578 """ 3579 Protocol base class for WebSocket clients. 3580 """ 3581 3582 CONFIG_ATTRS = WebSocketProtocol.CONFIG_ATTRS_COMMON + WebSocketProtocol.CONFIG_ATTRS_CLIENT 3583 3584 3585 def onConnect(self, response): 3586 """ 3587 Callback fired directly after WebSocket opening handshake when new WebSocket server 3588 connection was established. 3589 3590 :param response: WebSocket connection response information. 3591 :type response: instance of :class:`autobahn.websocket.protocol.ConnectionResponse` 3592 """ 3593 pass 3594 3595 3596 def _connectionMade(self): 3597 """ 3598 Called by network framework when new transport connection to server was established. Default 3599 implementation will start the initial WebSocket opening handshake (or proxy connect). 3600 When overriding in derived class, make sure to call this base class 3601 implementation _before_ your code. 3602 """ 3603 WebSocketProtocol._connectionMade(self) 3604 if self.debug: 3605 self.factory._log("connection to %s established" % self.peer) 3606 3607 if not self.factory.isServer and self.factory.proxy is not None: 3608 ## start by doing a HTTP/CONNECT for explicit proxies 3609 self.startProxyConnect() 3610 else: 3611 ## immediately start with the WebSocket opening handshake 3612 self.startHandshake() 3613 3614 3615 def _connectionLost(self, reason): 3616 """ 3617 Called by network framework when established transport connection to server was lost. Default 3618 implementation will tear down all state properly. 3619 When overriding in derived class, make sure to call this base class 3620 implementation _after_ your code. 3621 """ 3622 WebSocketProtocol._connectionLost(self, reason) 3623 if self.debug: 3624 self.factory._log("connection to %s lost" % self.peer) 3625 3626 3627 def startProxyConnect(self): 3628 """ 3629 Connect to explicit proxy. 3630 """ 3631 3632 ## construct proxy connect HTTP request 3633 ## 3634 request = "CONNECT %s:%d HTTP/1.1\x0d\x0a" % (self.factory.host.encode("utf-8"), self.factory.port) 3635 request += "Host: %s:%d\x0d\x0a" % (self.factory.host.encode("utf-8"), self.factory.port) 3636 request += "\x0d\x0a" 3637 3638 if self.debug: 3639 self.factory._log(request) 3640 3641 self.sendData(request) 3642 3643 3644 def processProxyConnect(self): 3645 """ 3646 Process HTTP/CONNECT response from server. 3647 """ 3648 ## only proceed when we have fully received the HTTP request line and all headers 3649 ## 3650 end_of_header = self.data.find(b"\x0d\x0a\x0d\x0a") 3651 if end_of_header >= 0: 3652 3653 http_response_data = self.data[:end_of_header + 4] 3654 if self.debug: 3655 self.factory._log("received HTTP response:\n\n%s\n\n" % http_response_data) 3656 3657 ## extract HTTP status line and headers 3658 ## 3659 (http_status_line, http_headers, http_headers_cnt) = parseHttpHeader(http_response_data) 3660 3661 ## validate proxy connect response 3662 ## 3663 if self.debug: 3664 self.factory._log("received HTTP status line for proxy connect request : %s" % str(http_status_line)) 3665 self.factory._log("received HTTP headers for proxy connect request : %s" % str(http_headers)) 3666 3667 ## Response Line 3668 ## 3669 sl = http_status_line.split() 3670 if len(sl) < 2: 3671 return self.failProxyConnect("Bad HTTP response status line '%s'" % http_status_line) 3672 3673 ## HTTP version 3674 ## 3675 http_version = sl[0].strip() 3676 if http_version != "HTTP/1.1": 3677 return self.failProxyConnect("Unsupported HTTP version ('%s')" % http_version) 3678 3679 ## HTTP status code 3680 ## 3681 try: 3682 status_code = int(sl[1].strip()) 3683 except: 3684 return self.failProxyConnect("Bad HTTP status code ('%s')" % sl[1].strip()) 3685 3686 if not (status_code >= 200 and status_code < 300): 3687 3688 ## FIXME: handle redirects 3689 ## FIXME: handle authentication required 3690 3691 if len(sl) > 2: 3692 reason = " - %s" % ''.join(sl[2:]) 3693 else: 3694 reason = "" 3695 return self.failProxyConnect("HTTP proxy connect failed (%d%s)" % (status_code, reason)) 3696 3697 3698 ## Ok, got complete response for HTTP/CONNECT, remember rest (if any) 3699 ## 3700 self.data = self.data[end_of_header + 4:] 3701 3702 ## opening handshake completed, move WebSocket connection into OPEN state 3703 ## 3704 self.state = WebSocketProtocol.STATE_CONNECTING 3705 3706 ## process rest of buffered data, if any 3707 ## 3708 if len(self.data) > 0: 3709 self.consumeData() 3710 3711 ## now start WebSocket opening handshake 3712 ## 3713 self.startHandshake() 3714 3715 3716 def failProxyConnect(self, reason): 3717 """ 3718 During initial explicit proxy connect, the server response indicates some failure and we drop the 3719 connection. 3720 """ 3721 if self.debug: 3722 self.factory._log("failing proxy connect ('%s')" % reason) 3723 self.dropConnection(abort = True) 3724 3725 3726 def createHixieKey(self): 3727 """ 3728 Implements this algorithm: 3729 3730 http://tools.ietf.org/html/draft-hixie-thewebsocketprotocol-76#page-21 3731 Items 16 - 22 3732 """ 3733 spaces1 = random.randint(1, 12) 3734 max1 = int(4294967295 / spaces1) 3735 number1 = random.randint(0, max1) 3736 product1 = number1 * spaces1 3737 key1 = str(product1) 3738 rchars = filter(lambda x: (x >= 0x21 and x <= 0x2f) or (x >= 0x3a and x <= 0x7e), range(0,127)) 3739 for i in xrange(random.randint(1, 12)): 3740 p = random.randint(0, len(key1) - 1) 3741 key1 = key1[:p] + chr(random.choice(rchars)) + key1[p:] 3742 for i in xrange(spaces1): 3743 p = random.randint(1, len(key1) - 2) 3744 key1 = key1[:p] + ' ' + key1[p:] 3745 return (key1, number1) 3746 3747 3748 def startHandshake(self): 3749 """ 3750 Start WebSocket opening handshake. 3751 """ 3752 3753 ## construct WS opening handshake HTTP header 3754 ## 3755 request = "GET %s HTTP/1.1\x0d\x0a" % self.factory.resource 3756 3757 if self.factory.useragent is not None and self.factory.useragent != "": 3758 request += "User-Agent: %s\x0d\x0a" % self.factory.useragent 3759 3760 request += "Host: %s:%d\x0d\x0a" % (self.factory.host, self.factory.port) 3761 request += "Upgrade: WebSocket\x0d\x0a" 3762 request += "Connection: Upgrade\x0d\x0a" 3763 3764 ## this seems to prohibit some non-compliant proxies from removing the 3765 ## connection "Upgrade" header 3766 ## See also: 3767 ## http://www.ietf.org/mail-archive/web/hybi/current/msg09841.html 3768 ## http://code.google.com/p/chromium/issues/detail?id=148908 3769 ## 3770 request += "Pragma: no-cache\x0d\x0a" 3771 request += "Cache-Control: no-cache\x0d\x0a" 3772 3773 ## optional, user supplied additional HTTP headers 3774 ## 3775 for uh in self.factory.headers.items(): 3776 request += "%s: %s\x0d\x0a" % (uh[0], uh[1]) 3777 3778 ## handshake random key 3779 ## 3780 if self.version == 0: 3781 (self.websocket_key1, number1) = self.createHixieKey() 3782 (self.websocket_key2, number2) = self.createHixieKey() 3783 self.websocket_key3 = os.urandom(8) 3784 accept_val = struct.pack(">II", number1, number2) + self.websocket_key3 3785 self.websocket_expected_challenge_response = hashlib.md5(accept_val).digest() 3786 3787 ## Safari does NOT set Content-Length, even though the body is 3788 ## non-empty, and the request unchunked. We do it. 3789 ## See also: http://www.ietf.org/mail-archive/web/hybi/current/msg02149.html 3790 request += "Content-Length: %s\x0d\x0a" % len(self.websocket_key3) 3791 3792 ## First two keys. 3793 request += "Sec-WebSocket-Key1: %s\x0d\x0a" % self.websocket_key1 3794 request += "Sec-WebSocket-Key2: %s\x0d\x0a" % self.websocket_key2 3795 request_body = self.websocket_key3 3796 else: 3797 self.websocket_key = base64.b64encode(os.urandom(16)) 3798 request += "Sec-WebSocket-Key: %s\x0d\x0a" % self.websocket_key.decode() 3799 request_body = None 3800 3801 ## optional origin announced 3802 ## 3803 if self.factory.origin: 3804 if self.version > 10 or self.version == 0: 3805 request += "Origin: %s\x0d\x0a" % self.factory.origin 3806 else: 3807 request += "Sec-WebSocket-Origin: %s\x0d\x0a" % self.factory.origin 3808 3809 ## optional list of WS subprotocols announced 3810 ## 3811 if len(self.factory.protocols) > 0: 3812 request += "Sec-WebSocket-Protocol: %s\x0d\x0a" % ','.join(self.factory.protocols) 3813 3814 ## extensions 3815 ## 3816 if self.version != 0: 3817 extensions = [] 3818 3819 ## permessage-compress offers 3820 ## 3821 for offer in self.perMessageCompressionOffers: 3822 extensions.append(offer.getExtensionString()) 3823 3824 if len(extensions) > 0: 3825 request += "Sec-WebSocket-Extensions: %s\x0d\x0a" % ', '.join(extensions) 3826 3827 ## set WS protocol version depending on WS spec version 3828 ## 3829 if self.version != 0: 3830 request += "Sec-WebSocket-Version: %d\x0d\x0a" % WebSocketProtocol.SPEC_TO_PROTOCOL_VERSION[self.version] 3831 3832 request += "\x0d\x0a" 3833 3834 self.http_request_data = request.encode('utf8') 3835 self.sendData(self.http_request_data) 3836 3837 if request_body: 3838 ## Write HTTP request body for Hixie-76 3839 self.sendData(request_body) 3840 3841 if self.debug: 3842 self.factory._log(request) 3843 3844 3845 def processHandshake(self): 3846 """ 3847 Process WebSocket opening handshake response from server. 3848 """ 3849 ## only proceed when we have fully received the HTTP request line and all headers 3850 ## 3851 end_of_header = self.data.find(b"\x0d\x0a\x0d\x0a") 3852 if end_of_header >= 0: 3853 3854 self.http_response_data = self.data[:end_of_header + 4] 3855 if self.debug: 3856 self.factory._log("received HTTP response:\n\n%s\n\n" % self.http_response_data) 3857 3858 ## extract HTTP status line and headers 3859 ## 3860 (self.http_status_line, self.http_headers, http_headers_cnt) = parseHttpHeader(self.http_response_data) 3861 3862 ## validate WebSocket opening handshake server response 3863 ## 3864 if self.debug: 3865 self.factory._log("received HTTP status line in opening handshake : %s" % str(self.http_status_line)) 3866 self.factory._log("received HTTP headers in opening handshake : %s" % str(self.http_headers)) 3867 3868 ## Response Line 3869 ## 3870 sl = self.http_status_line.split() 3871 if len(sl) < 2: 3872 return self.failHandshake("Bad HTTP response status line '%s'" % self.http_status_line) 3873 3874 ## HTTP version 3875 ## 3876 http_version = sl[0].strip() 3877 if http_version != "HTTP/1.1": 3878 return self.failHandshake("Unsupported HTTP version ('%s')" % http_version) 3879 3880 ## HTTP status code 3881 ## 3882 try: 3883 status_code = int(sl[1].strip()) 3884 except: 3885 return self.failHandshake("Bad HTTP status code ('%s')" % sl[1].strip()) 3886 if status_code != http.SWITCHING_PROTOCOLS[0]: 3887 3888 ## FIXME: handle redirects 3889 ## FIXME: handle authentication required 3890 3891 if len(sl) > 2: 3892 reason = " - %s" % ''.join(sl[2:]) 3893 else: 3894 reason = "" 3895 return self.failHandshake("WebSocket connection upgrade failed (%d%s)" % (status_code, reason)) 3896 3897 ## Upgrade 3898 ## 3899 if not 'upgrade' in self.http_headers: 3900 return self.failHandshake("HTTP Upgrade header missing") 3901 if self.http_headers["upgrade"].strip().lower() != "websocket": 3902 return self.failHandshake("HTTP Upgrade header different from 'websocket' (case-insensitive) : %s" % self.http_headers["upgrade"]) 3903 3904 ## Connection 3905 ## 3906 if not 'connection' in self.http_headers: 3907 return self.failHandshake("HTTP Connection header missing") 3908 connectionUpgrade = False 3909 for c in self.http_headers["connection"].split(","): 3910 if c.strip().lower() == "upgrade": 3911 connectionUpgrade = True 3912 break 3913 if not connectionUpgrade: 3914 return self.failHandshake("HTTP Connection header does not include 'upgrade' value (case-insensitive) : %s" % self.http_headers["connection"]) 3915 3916 ## compute Sec-WebSocket-Accept 3917 ## 3918 if self.version != 0: 3919 if not 'sec-websocket-accept' in self.http_headers: 3920 return self.failHandshake("HTTP Sec-WebSocket-Accept header missing in opening handshake reply") 3921 else: 3922 if http_headers_cnt["sec-websocket-accept"] > 1: 3923 return self.failHandshake("HTTP Sec-WebSocket-Accept header appears more than once in opening handshake reply") 3924 sec_websocket_accept_got = self.http_headers["sec-websocket-accept"].strip() 3925 3926 sha1 = hashlib.sha1() 3927 sha1.update(self.websocket_key + WebSocketProtocol._WS_MAGIC) 3928 sec_websocket_accept = base64.b64encode(sha1.digest()).decode() 3929 3930 if sec_websocket_accept_got != sec_websocket_accept: 3931 return self.failHandshake("HTTP Sec-WebSocket-Accept bogus value : expected %s / got %s" % (sec_websocket_accept, sec_websocket_accept_got)) 3932 3933 ## Sec-WebSocket-Extensions 3934 ## 3935 3936 ## extensions effectively in use for this connection 3937 ## 3938 self.websocket_extensions_in_use = [] 3939 3940 if 'sec-websocket-extensions' in self.http_headers: 3941 3942 if self.version == 0: 3943 return self.failHandshake("HTTP Sec-WebSocket-Extensions header encountered for Hixie-76") 3944 else: 3945 if http_headers_cnt["sec-websocket-extensions"] > 1: 3946 return self.failHandshake("HTTP Sec-WebSocket-Extensions header appears more than once in opening handshake reply") 3947 else: 3948 ## extensions select by server 3949 ## 3950 websocket_extensions = self._parseExtensionsHeader(self.http_headers["sec-websocket-extensions"]) 3951 3952 ## process extensions selected by server 3953 ## 3954 for (extension, params) in websocket_extensions: 3955 3956 if self.debug: 3957 self.factory._log("parsed WebSocket extension '%s' with params '%s'" % (extension, params)) 3958 3959 ## process permessage-compress extension 3960 ## 3961 if extension in PERMESSAGE_COMPRESSION_EXTENSION: 3962 3963 ## check that server only responded with 1 configuration ("PMCE") 3964 ## 3965 if self._perMessageCompress is not None: 3966 return self.failHandshake("multiple occurence of a permessage-compress extension") 3967 3968 PMCE = PERMESSAGE_COMPRESSION_EXTENSION[extension] 3969 3970 try: 3971 pmceResponse = PMCE['Response'].parse(params) 3972 except Exception as e: 3973 return self.failHandshake(str(e)) 3974 3975 accept = self.perMessageCompressionAccept(pmceResponse) 3976 3977 if accept is None: 3978 return self.failHandshake("WebSocket permessage-compress extension response from server denied by client") 3979 3980 self._perMessageCompress = PMCE['PMCE'].createFromResponseAccept(self.factory.isServer, accept) 3981 3982 self.websocket_extensions_in_use.append(self._perMessageCompress) 3983 3984 else: 3985 return self.failHandshake("server wants to use extension '%s' we did not request, haven't implemented or did not enable" % extension) 3986 3987 ## handle "subprotocol in use" - if any 3988 ## 3989 self.websocket_protocol_in_use = None 3990 if 'sec-websocket-protocol' in self.http_headers: 3991 if http_headers_cnt["sec-websocket-protocol"] > 1: 3992 return self.failHandshake("HTTP Sec-WebSocket-Protocol header appears more than once in opening handshake reply") 3993 sp = str(self.http_headers["sec-websocket-protocol"].strip()) 3994 if sp != "": 3995 if sp not in self.factory.protocols: 3996 return self.failHandshake("subprotocol selected by server (%s) not in subprotocol list requested by client (%s)" % (sp, str(self.factory.protocols))) 3997 else: 3998 ## ok, subprotocol in use 3999 ## 4000 self.websocket_protocol_in_use = sp 4001 4002 ## For Hixie-76, we need 16 octets of HTTP request body to complete HS! 4003 ## 4004 if self.version == 0: 4005 if len(self.data) < end_of_header + 4 + 16: 4006 return 4007 else: 4008 challenge_response = self.data[end_of_header + 4:end_of_header + 4 + 16] 4009 if challenge_response != self.websocket_expected_challenge_response: 4010 return self.failHandshake("invalid challenge response received from server (Hixie-76)") 4011 4012 ## Ok, got complete HS input, remember rest (if any) 4013 ## 4014 if self.version == 0: 4015 self.data = self.data[end_of_header + 4 + 16:] 4016 else: 4017 self.data = self.data[end_of_header + 4:] 4018 4019 ## opening handshake completed, move WebSocket connection into OPEN state 4020 ## 4021 self.state = WebSocketProtocol.STATE_OPEN 4022 self.inside_message = False 4023 if self.version != 0: 4024 self.current_frame = None 4025 self.websocket_version = self.version 4026 4027 ## we handle this symmetrical to server-side .. that is, give the 4028 ## client a chance to bail out .. i.e. on no subprotocol selected 4029 ## by server 4030 try: 4031 response = ConnectionResponse(self.peer, 4032 self.http_headers, 4033 None, # FIXME 4034 self.websocket_protocol_in_use, 4035 self.websocket_extensions_in_use) 4036 4037 self._onConnect(response) 4038 4039 except Exception as e: 4040 ## immediately close the WS connection 4041 ## 4042 self.failConnection(1000, str(e)) 4043 else: 4044 ## fire handler on derived class 4045 ## 4046 if self.trackedTimings: 4047 self.trackedTimings.track("onOpen") 4048 self._onOpen() 4049 4050 ## process rest, if any 4051 ## 4052 if len(self.data) > 0: 4053 self.consumeData() 4054 4055 4056 def failHandshake(self, reason): 4057 """ 4058 During opening handshake the server response is invalid and we drop the 4059 connection. 4060 """ 4061 if self.debug: 4062 self.factory._log("failing WebSocket opening handshake ('%s')" % reason) 4063 self.dropConnection(abort = True) 4064 4065 4066 4067class WebSocketClientFactory(WebSocketFactory): 4068 """ 4069 A protocol factory for WebSocket clients. 4070 """ 4071 4072 protocol = WebSocketClientProtocol 4073 """ 4074 The protocol to be spoken. Must be derived from :class:`autobahn.websocket.protocol.WebSocketClientProtocol`. 4075 """ 4076 4077 isServer = False 4078 """ 4079 Flag indicating if this factory is client- or server-side. 4080 """ 4081 4082 4083 def __init__(self, 4084 url = None, 4085 origin = None, 4086 protocols = None, 4087 useragent = "AutobahnPython/%s" % __version__, 4088 headers = None, 4089 proxy = None, 4090 debug = False, 4091 debugCodePaths = False): 4092 """ 4093 Create instance of WebSocket client factory. 4094 4095 Note that you MUST provide URL either here or set using 4096 :meth:`autobahn.websocket.WebSocketClientFactory.setSessionParameters` 4097 *before* the factory is started. 4098 4099 :param url: WebSocket URL this factory will connect to, e.g. `ws://myhost.com/somepath?param1=23`. 4100 For non-TCP transports like pipes or Unix domain sockets, provide `None`. 4101 This will use an implicit URL of `ws://localhost`. 4102 :type url: str 4103 :param origin: The origin to be sent in WebSocket opening handshake or None (default: `None`). 4104 :type origin: str 4105 :param protocols: List of subprotocols the client should announce in WebSocket opening handshake (default: `[]`). 4106 :type protocols: list of strings 4107 :param useragent: User agent as announced in HTTP request header or None (default: `AutobahnWebSocket/?.?.?`). 4108 :type useragent: str 4109 :param headers: An optional mapping of additional HTTP headers to send during the WebSocket opening handshake. 4110 :type headers: dict 4111 :param proxy: Explicit proxy server to use (`hostname:port` or `IP:port`), e.g. `192.168.1.100:8080`. 4112 :type proxy: str 4113 :param debug: Debug mode (default: `False`). 4114 :type debug: bool 4115 :param debugCodePaths: Debug code paths mode (default: `False`). 4116 :type debugCodePaths: bool 4117 """ 4118 self.debug = debug 4119 self.debugCodePaths = debugCodePaths 4120 4121 self.logOctets = debug 4122 self.logFrames = debug 4123 4124 self.trackTimings = False 4125 4126 ## seed RNG which is used for WS opening handshake key and WS frame masks generation 4127 random.seed() 4128 4129 ## default WS session parameters 4130 ## 4131 self.setSessionParameters(url, origin, protocols, useragent, headers, proxy) 4132 4133 ## default WebSocket protocol options 4134 ## 4135 self.resetProtocolOptions() 4136 4137 4138 def setSessionParameters(self, 4139 url = None, 4140 origin = None, 4141 protocols = None, 4142 useragent = None, 4143 headers = None, 4144 proxy = None): 4145 """ 4146 Set WebSocket session parameters. 4147 4148 :param url: WebSocket URL this factory will connect to, e.g. `ws://myhost.com/somepath?param1=23`. 4149 For non-TCP transports like pipes or Unix domain sockets, provide `None`. 4150 This will use an implicit URL of `ws://localhost`. 4151 :type url: str 4152 :param origin: The origin to be sent in opening handshake. 4153 :type origin: str 4154 :param protocols: List of WebSocket subprotocols the client should announce in opening handshake. 4155 :type protocols: list of strings 4156 :param useragent: User agent as announced in HTTP request header during opening handshake. 4157 :type useragent: str 4158 :param headers: An optional mapping of additional HTTP headers to send during the WebSocket opening handshake. 4159 :type headers: dict 4160 """ 4161 ## parse WebSocket URI into components 4162 (isSecure, host, port, resource, path, params) = parseWsUrl(url or "ws://localhost") 4163 self.url = url 4164 self.isSecure = isSecure 4165 self.host = host 4166 self.port = port 4167 self.resource = resource 4168 self.path = path 4169 self.params = params 4170 4171 self.origin = origin 4172 self.protocols = protocols or [] 4173 self.useragent = useragent 4174 self.headers = headers or {} 4175 4176 self.proxy = proxy 4177 4178 4179 def resetProtocolOptions(self): 4180 """ 4181 Reset all WebSocket protocol options to defaults. 4182 """ 4183 self.version = WebSocketProtocol.DEFAULT_SPEC_VERSION 4184 self.allowHixie76 = WebSocketProtocol.DEFAULT_ALLOW_HIXIE76 4185 self.utf8validateIncoming = True 4186 self.acceptMaskedServerFrames = False 4187 self.maskClientFrames = True 4188 self.applyMask = True 4189 self.maxFramePayloadSize = 0 4190 self.maxMessagePayloadSize = 0 4191 self.autoFragmentSize = 0 4192 self.failByDrop = True 4193 self.echoCloseCodeReason = False 4194 self.serverConnectionDropTimeout = 1 4195 self.openHandshakeTimeout = 5 4196 self.closeHandshakeTimeout = 1 4197 self.tcpNoDelay = True 4198 4199 ## permessage-XXX extensions 4200 ## 4201 self.perMessageCompressionOffers = [] 4202 self.perMessageCompressionAccept = lambda _: None 4203 4204 4205 def setProtocolOptions(self, 4206 version = None, 4207 allowHixie76 = None, 4208 utf8validateIncoming = None, 4209 acceptMaskedServerFrames = None, 4210 maskClientFrames = None, 4211 applyMask = None, 4212 maxFramePayloadSize = None, 4213 maxMessagePayloadSize = None, 4214 autoFragmentSize = None, 4215 failByDrop = None, 4216 echoCloseCodeReason = None, 4217 serverConnectionDropTimeout = None, 4218 openHandshakeTimeout = None, 4219 closeHandshakeTimeout = None, 4220 tcpNoDelay = None, 4221 perMessageCompressionOffers = None, 4222 perMessageCompressionAccept = None): 4223 """ 4224 Set WebSocket protocol options used as defaults for _new_ protocol instances. 4225 4226 :param version: The WebSocket protocol spec (draft) version to be used (default: :func:`autobahn.websocket.protocol.WebSocketProtocol.SUPPORTED_PROTOCOL_VERSIONS`). 4227 :param utf8validateIncoming: Validate incoming UTF-8 in text message payloads (default: `True`). 4228 :type utf8validateIncoming: bool 4229 :param acceptMaskedServerFrames: Accept masked server-to-client frames (default: `False`). 4230 :type acceptMaskedServerFrames: bool 4231 :param maskClientFrames: Mask client-to-server frames (default: `True`). 4232 :type maskClientFrames: bool 4233 :param applyMask: Actually apply mask to payload when mask it present. Applies for outgoing and incoming frames (default: `True`). 4234 :type applyMask: bool 4235 :param maxFramePayloadSize: Maximum frame payload size that will be accepted when receiving or `0` for unlimited (default: `0`). 4236 :type maxFramePayloadSize: int 4237 :param maxMessagePayloadSize: Maximum message payload size (after reassembly of fragmented messages) that will be accepted when receiving or `0` for unlimited (default: `0`). 4238 :type maxMessagePayloadSize: int 4239 :param autoFragmentSize: Automatic fragmentation of outgoing data messages (when using the message-based API) into frames with payload length `<=` this size or `0` for no auto-fragmentation (default: `0`). 4240 :type autoFragmentSize: int 4241 :param failByDrop: Fail connections by dropping the TCP connection without performing closing handshake (default: `True`). 4242 :type failbyDrop: bool 4243 :param echoCloseCodeReason: Iff true, when receiving a close, echo back close code/reason. Otherwise reply with `code == 1000, reason = ""` (default: `False`). 4244 :type echoCloseCodeReason: bool 4245 :param serverConnectionDropTimeout: When the client expects the server to drop the TCP, timeout in seconds (default: `1`). 4246 :type serverConnectionDropTimeout: float 4247 :param openHandshakeTimeout: Opening WebSocket handshake timeout, timeout in seconds or `0` to deactivate (default: `0`). 4248 :type openHandshakeTimeout: float 4249 :param closeHandshakeTimeout: When we expect to receive a closing handshake reply, timeout in seconds (default: `1`). 4250 :type closeHandshakeTimeout: float 4251 :param tcpNoDelay: TCP NODELAY ("Nagle"): bool socket option (default: `True`). 4252 :type tcpNoDelay: bool 4253 :param perMessageCompressionOffers: A list of offers to provide to the server for the permessage-compress WebSocket extension. Must be a list of instances of subclass of PerMessageCompressOffer. 4254 :type perMessageCompressionOffers: list of instance of subclass of PerMessageCompressOffer 4255 :param perMessageCompressionAccept: Acceptor function for responses. 4256 :type perMessageCompressionAccept: callable 4257 """ 4258 if allowHixie76 is not None and allowHixie76 != self.allowHixie76: 4259 self.allowHixie76 = allowHixie76 4260 4261 if version is not None: 4262 if version not in WebSocketProtocol.SUPPORTED_SPEC_VERSIONS: 4263 raise Exception("invalid WebSocket draft version %s (allowed values: %s)" % (version, str(WebSocketProtocol.SUPPORTED_SPEC_VERSIONS))) 4264 if version == 0 and not self.allowHixie76: 4265 raise Exception("use of Hixie-76 requires allowHixie76 == True") 4266 if version != self.version: 4267 self.version = version 4268 4269 if utf8validateIncoming is not None and utf8validateIncoming != self.utf8validateIncoming: 4270 self.utf8validateIncoming = utf8validateIncoming 4271 4272 if acceptMaskedServerFrames is not None and acceptMaskedServerFrames != self.acceptMaskedServerFrames: 4273 self.acceptMaskedServerFrames = acceptMaskedServerFrames 4274 4275 if maskClientFrames is not None and maskClientFrames != self.maskClientFrames: 4276 self.maskClientFrames = maskClientFrames 4277 4278 if applyMask is not None and applyMask != self.applyMask: 4279 self.applyMask = applyMask 4280 4281 if maxFramePayloadSize is not None and maxFramePayloadSize != self.maxFramePayloadSize: 4282 self.maxFramePayloadSize = maxFramePayloadSize 4283 4284 if maxMessagePayloadSize is not None and maxMessagePayloadSize != self.maxMessagePayloadSize: 4285 self.maxMessagePayloadSize = maxMessagePayloadSize 4286 4287 if autoFragmentSize is not None and autoFragmentSize != self.autoFragmentSize: 4288 self.autoFragmentSize = autoFragmentSize 4289 4290 if failByDrop is not None and failByDrop != self.failByDrop: 4291 self.failByDrop = failByDrop 4292 4293 if echoCloseCodeReason is not None and echoCloseCodeReason != self.echoCloseCodeReason: 4294 self.echoCloseCodeReason = echoCloseCodeReason 4295 4296 if serverConnectionDropTimeout is not None and serverConnectionDropTimeout != self.serverConnectionDropTimeout: 4297 self.serverConnectionDropTimeout = serverConnectionDropTimeout 4298 4299 if openHandshakeTimeout is not None and openHandshakeTimeout != self.openHandshakeTimeout: 4300 self.openHandshakeTimeout = openHandshakeTimeout 4301 4302 if closeHandshakeTimeout is not None and closeHandshakeTimeout != self.closeHandshakeTimeout: 4303 self.closeHandshakeTimeout = closeHandshakeTimeout 4304 4305 if tcpNoDelay is not None and tcpNoDelay != self.tcpNoDelay: 4306 self.tcpNoDelay = tcpNoDelay 4307 4308 if perMessageCompressionOffers is not None and pickle.dumps(perMessageCompressionOffers) != pickle.dumps(self.perMessageCompressionOffers): 4309 if type(perMessageCompressionOffers) == list: 4310 ## 4311 ## FIXME: more rigorous verification of passed argument 4312 ## 4313 self.perMessageCompressionOffers = copy.deepcopy(perMessageCompressionOffers) 4314 else: 4315 raise Exception("invalid type %s for perMessageCompressionOffers - expected list" % type(perMessageCompressionOffers)) 4316 4317 if perMessageCompressionAccept is not None and perMessageCompressionAccept != self.perMessageCompressionAccept: 4318 self.perMessageCompressionAccept = perMessageCompressionAccept 4319