1 /* This Source Code Form is subject to the terms of the Mozilla Public
2  * License, v. 2.0. If a copy of the MPL was not distributed with this
3  * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4 
5 use dom::bindings::cell::DomRefCell;
6 use dom::bindings::codegen::Bindings::BlobBinding::BlobMethods;
7 use dom::bindings::codegen::Bindings::WebSocketBinding;
8 use dom::bindings::codegen::Bindings::WebSocketBinding::{BinaryType, WebSocketMethods};
9 use dom::bindings::codegen::UnionTypes::StringOrStringSequence;
10 use dom::bindings::conversions::ToJSValConvertible;
11 use dom::bindings::error::{Error, ErrorResult, Fallible};
12 use dom::bindings::inheritance::Castable;
13 use dom::bindings::refcounted::Trusted;
14 use dom::bindings::reflector::{DomObject, reflect_dom_object};
15 use dom::bindings::root::DomRoot;
16 use dom::bindings::str::{DOMString, USVString, is_token};
17 use dom::blob::{Blob, BlobImpl};
18 use dom::closeevent::CloseEvent;
19 use dom::event::{Event, EventBubbles, EventCancelable};
20 use dom::eventtarget::EventTarget;
21 use dom::globalscope::GlobalScope;
22 use dom::messageevent::MessageEvent;
23 use dom_struct::dom_struct;
24 use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
25 use js::jsapi::{JSAutoCompartment, JSObject};
26 use js::jsval::UndefinedValue;
27 use js::typedarray::{ArrayBuffer, CreateWith};
28 use net_traits::{CoreResourceMsg, FetchChannels};
29 use net_traits::{WebSocketDomAction, WebSocketNetworkEvent};
30 use net_traits::MessageData;
31 use net_traits::request::{RequestInit, RequestMode};
32 use script_runtime::CommonScriptMsg;
33 use script_runtime::ScriptThreadEventCategory::WebSocketEvent;
34 use servo_url::ServoUrl;
35 use std::borrow::ToOwned;
36 use std::cell::Cell;
37 use std::ptr;
38 use std::thread;
39 use task::{TaskOnce, TaskCanceller};
40 use task_source::TaskSource;
41 use task_source::networking::NetworkingTaskSource;
42 
43 #[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
44 enum WebSocketRequestState {
45     Connecting = 0,
46     Open = 1,
47     Closing = 2,
48     Closed = 3,
49 }
50 
51 // Close codes defined in https://tools.ietf.org/html/rfc6455#section-7.4.1
52 // Names are from https://github.com/mozilla/gecko-dev/blob/master/netwerk/protocol/websocket/nsIWebSocketChannel.idl
53 #[allow(dead_code)]
54 mod close_code {
55     pub const NORMAL: u16 = 1000;
56     pub const GOING_AWAY: u16 = 1001;
57     pub const PROTOCOL_ERROR: u16 = 1002;
58     pub const UNSUPPORTED_DATATYPE: u16 = 1003;
59     pub const NO_STATUS: u16 = 1005;
60     pub const ABNORMAL: u16 = 1006;
61     pub const INVALID_PAYLOAD: u16 = 1007;
62     pub const POLICY_VIOLATION: u16 = 1008;
63     pub const TOO_LARGE: u16 = 1009;
64     pub const EXTENSION_MISSING: u16 = 1010;
65     pub const INTERNAL_ERROR: u16 = 1011;
66     pub const TLS_FAILED: u16 = 1015;
67 }
68 
close_the_websocket_connection( address: Trusted<WebSocket>, task_source: &NetworkingTaskSource, canceller: &TaskCanceller, code: Option<u16>, reason: String, )69 pub fn close_the_websocket_connection(
70     address: Trusted<WebSocket>,
71     task_source: &NetworkingTaskSource,
72     canceller: &TaskCanceller,
73     code: Option<u16>,
74     reason: String,
75 ) {
76     let close_task = CloseTask {
77         address: address,
78         failed: false,
79         code: code,
80         reason: Some(reason),
81     };
82     task_source.queue_with_canceller(close_task, &canceller).unwrap();
83 }
84 
fail_the_websocket_connection( address: Trusted<WebSocket>, task_source: &NetworkingTaskSource, canceller: &TaskCanceller, )85 pub fn fail_the_websocket_connection(
86     address: Trusted<WebSocket>,
87     task_source: &NetworkingTaskSource,
88     canceller: &TaskCanceller,
89 ) {
90     let close_task = CloseTask {
91         address: address,
92         failed: true,
93         code: Some(close_code::ABNORMAL),
94         reason: None,
95     };
96     task_source.queue_with_canceller(close_task, &canceller).unwrap();
97 }
98 
99 #[dom_struct]
100 pub struct WebSocket {
101     eventtarget: EventTarget,
102     url: ServoUrl,
103     ready_state: Cell<WebSocketRequestState>,
104     buffered_amount: Cell<u64>,
105     clearing_buffer: Cell<bool>, //Flag to tell if there is a running thread to clear buffered_amount
106     #[ignore_malloc_size_of = "Defined in std"]
107     sender: DomRefCell<Option<IpcSender<WebSocketDomAction>>>,
108     binary_type: Cell<BinaryType>,
109     protocol: DomRefCell<String>, //Subprotocol selected by server
110 }
111 
112 impl WebSocket {
new_inherited(url: ServoUrl) -> WebSocket113     fn new_inherited(url: ServoUrl) -> WebSocket {
114         WebSocket {
115             eventtarget: EventTarget::new_inherited(),
116             url: url,
117             ready_state: Cell::new(WebSocketRequestState::Connecting),
118             buffered_amount: Cell::new(0),
119             clearing_buffer: Cell::new(false),
120             sender: DomRefCell::new(None),
121             binary_type: Cell::new(BinaryType::Blob),
122             protocol: DomRefCell::new("".to_owned()),
123         }
124     }
125 
new(global: &GlobalScope, url: ServoUrl) -> DomRoot<WebSocket>126     fn new(global: &GlobalScope, url: ServoUrl) -> DomRoot<WebSocket> {
127         reflect_dom_object(Box::new(WebSocket::new_inherited(url)),
128                            global, WebSocketBinding::Wrap)
129     }
130 
131     /// <https://html.spec.whatwg.org/multipage/#dom-websocket>
Constructor(global: &GlobalScope, url: DOMString, protocols: Option<StringOrStringSequence>) -> Fallible<DomRoot<WebSocket>>132     pub fn Constructor(global: &GlobalScope,
133                        url: DOMString,
134                        protocols: Option<StringOrStringSequence>)
135                        -> Fallible<DomRoot<WebSocket>> {
136         // Steps 1-2.
137         let url_record = ServoUrl::parse(&url).or(Err(Error::Syntax))?;
138 
139         // Step 3.
140         match url_record.scheme() {
141             "ws" | "wss" => {},
142             _ => return Err(Error::Syntax),
143         }
144 
145         // Step 4.
146         if url_record.fragment().is_some() {
147             return Err(Error::Syntax);
148         }
149 
150         // Step 5.
151         let protocols = protocols.map_or(vec![], |p| {
152             match p {
153                 StringOrStringSequence::String(string) => vec![string.into()],
154                 StringOrStringSequence::StringSequence(seq) => {
155                     seq.into_iter().map(String::from).collect()
156                 },
157             }
158         });
159 
160         // Step 6.
161         for (i, protocol) in protocols.iter().enumerate() {
162             // https://tools.ietf.org/html/rfc6455#section-4.1
163             // Handshake requirements, step 10
164 
165             if protocols[i + 1..].iter().any(|p| p.eq_ignore_ascii_case(protocol)) {
166                 return Err(Error::Syntax);
167             }
168 
169             // https://tools.ietf.org/html/rfc6455#section-4.1
170             if !is_token(protocol.as_bytes()) {
171                 return Err(Error::Syntax);
172             }
173         }
174 
175         let ws = WebSocket::new(global, url_record.clone());
176         let address = Trusted::new(&*ws);
177 
178         // Create the interface for communication with the resource thread
179         let (dom_action_sender, resource_action_receiver):
180                 (IpcSender<WebSocketDomAction>,
181                 IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap();
182         let (resource_event_sender, dom_event_receiver):
183                 (IpcSender<WebSocketNetworkEvent>,
184                 IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap();
185 
186         // Step 8.
187         let request = RequestInit {
188             url: url_record,
189             origin: global.origin().immutable().clone(),
190             mode: RequestMode::WebSocket { protocols },
191             ..RequestInit::default()
192         };
193         let channels = FetchChannels::WebSocket {
194             event_sender: resource_event_sender,
195             action_receiver: resource_action_receiver,
196         };
197         let _ = global.core_resource_thread().send(CoreResourceMsg::Fetch(request, channels));
198 
199         *ws.sender.borrow_mut() = Some(dom_action_sender);
200 
201         let task_source = global.networking_task_source();
202         let canceller = global.task_canceller();
203         thread::spawn(move || {
204             while let Ok(event) = dom_event_receiver.recv() {
205                 match event {
206                     WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use } => {
207                         let open_thread = ConnectionEstablishedTask {
208                             address: address.clone(),
209                             protocol_in_use,
210                         };
211                         task_source.queue_with_canceller(open_thread, &canceller).unwrap();
212                     },
213                     WebSocketNetworkEvent::MessageReceived(message) => {
214                         let message_thread = MessageReceivedTask {
215                             address: address.clone(),
216                             message: message,
217                         };
218                         task_source.queue_with_canceller(message_thread, &canceller).unwrap();
219                     },
220                     WebSocketNetworkEvent::Fail => {
221                         fail_the_websocket_connection(address.clone(),
222                             &task_source, &canceller);
223                     },
224                     WebSocketNetworkEvent::Close(code, reason) => {
225                         close_the_websocket_connection(address.clone(),
226                             &task_source, &canceller, code, reason);
227                     },
228                 }
229             }
230         });
231 
232         // Step 7.
233         Ok(ws)
234     }
235 
236     // https://html.spec.whatwg.org/multipage/#dom-websocket-send
send_impl(&self, data_byte_len: u64) -> Fallible<bool>237     fn send_impl(&self, data_byte_len: u64) -> Fallible<bool> {
238         let return_after_buffer = match self.ready_state.get() {
239             WebSocketRequestState::Connecting => {
240                 return Err(Error::InvalidState);
241             },
242             WebSocketRequestState::Open => false,
243             WebSocketRequestState::Closing | WebSocketRequestState::Closed => true,
244         };
245 
246         let address = Trusted::new(self);
247 
248         match data_byte_len.checked_add(self.buffered_amount.get()) {
249             None => panic!(),
250             Some(new_amount) => self.buffered_amount.set(new_amount)
251         };
252 
253         if return_after_buffer {
254             return Ok(false);
255         }
256 
257         if !self.clearing_buffer.get() && self.ready_state.get() == WebSocketRequestState::Open {
258             self.clearing_buffer.set(true);
259 
260             let task = Box::new(BufferedAmountTask {
261                 address: address,
262             });
263 
264             let pipeline_id = self.global().pipeline_id();
265             self.global()
266                 .script_chan()
267                 .send(CommonScriptMsg::Task(WebSocketEvent, task, Some(pipeline_id)))
268                 .unwrap();
269         }
270 
271         Ok(true)
272     }
273 }
274 
275 impl WebSocketMethods for WebSocket {
276     // https://html.spec.whatwg.org/multipage/#handler-websocket-onopen
277     event_handler!(open, GetOnopen, SetOnopen);
278 
279     // https://html.spec.whatwg.org/multipage/#handler-websocket-onclose
280     event_handler!(close, GetOnclose, SetOnclose);
281 
282     // https://html.spec.whatwg.org/multipage/#handler-websocket-onerror
283     event_handler!(error, GetOnerror, SetOnerror);
284 
285     // https://html.spec.whatwg.org/multipage/#handler-websocket-onmessage
286     event_handler!(message, GetOnmessage, SetOnmessage);
287 
288     // https://html.spec.whatwg.org/multipage/#dom-websocket-url
Url(&self) -> DOMString289     fn Url(&self) -> DOMString {
290         DOMString::from(self.url.as_str())
291     }
292 
293     // https://html.spec.whatwg.org/multipage/#dom-websocket-readystate
ReadyState(&self) -> u16294     fn ReadyState(&self) -> u16 {
295         self.ready_state.get() as u16
296     }
297 
298     // https://html.spec.whatwg.org/multipage/#dom-websocket-bufferedamount
BufferedAmount(&self) -> u64299     fn BufferedAmount(&self) -> u64 {
300         self.buffered_amount.get()
301     }
302 
303     // https://html.spec.whatwg.org/multipage/#dom-websocket-binarytype
BinaryType(&self) -> BinaryType304     fn BinaryType(&self) -> BinaryType {
305         self.binary_type.get()
306     }
307 
308     // https://html.spec.whatwg.org/multipage/#dom-websocket-binarytype
SetBinaryType(&self, btype: BinaryType)309     fn SetBinaryType(&self, btype: BinaryType) {
310         self.binary_type.set(btype)
311     }
312 
313     // https://html.spec.whatwg.org/multipage/#dom-websocket-protocol
Protocol(&self) -> DOMString314     fn Protocol(&self) -> DOMString {
315          DOMString::from(self.protocol.borrow().clone())
316     }
317 
318     // https://html.spec.whatwg.org/multipage/#dom-websocket-send
Send(&self, data: USVString) -> ErrorResult319     fn Send(&self, data: USVString) -> ErrorResult {
320         let data_byte_len = data.0.as_bytes().len() as u64;
321         let send_data = self.send_impl(data_byte_len)?;
322 
323         if send_data {
324             let mut other_sender = self.sender.borrow_mut();
325             let my_sender = other_sender.as_mut().unwrap();
326             let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0)));
327         }
328 
329         Ok(())
330     }
331 
332     // https://html.spec.whatwg.org/multipage/#dom-websocket-send
Send_(&self, blob: &Blob) -> ErrorResult333     fn Send_(&self, blob: &Blob) -> ErrorResult {
334         /* As per https://html.spec.whatwg.org/multipage/#websocket
335            the buffered amount needs to be clamped to u32, even though Blob.Size() is u64
336            If the buffer limit is reached in the first place, there are likely other major problems
337         */
338         let data_byte_len = blob.Size();
339         let send_data = self.send_impl(data_byte_len)?;
340 
341         if send_data {
342             let mut other_sender = self.sender.borrow_mut();
343             let my_sender = other_sender.as_mut().unwrap();
344             let bytes = blob.get_bytes().unwrap_or(vec![]);
345             let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes)));
346         }
347 
348         Ok(())
349     }
350 
351     // https://html.spec.whatwg.org/multipage/#dom-websocket-close
Close(&self, code: Option<u16>, reason: Option<USVString>) -> ErrorResult352     fn Close(&self, code: Option<u16>, reason: Option<USVString>) -> ErrorResult {
353         if let Some(code) = code {
354             //Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications
355             if code != close_code::NORMAL && (code < 3000 || code > 4999) {
356                 return Err(Error::InvalidAccess);
357             }
358         }
359         if let Some(ref reason) = reason {
360             if reason.0.as_bytes().len() > 123 { //reason cannot be larger than 123 bytes
361                 return Err(Error::Syntax);
362             }
363         }
364 
365         match self.ready_state.get() {
366             WebSocketRequestState::Closing | WebSocketRequestState::Closed  => {} //Do nothing
367             WebSocketRequestState::Connecting => { //Connection is not yet established
368                 /*By setting the state to closing, the open function
369                   will abort connecting the websocket*/
370                 self.ready_state.set(WebSocketRequestState::Closing);
371 
372                 let address = Trusted::new(self);
373                 let task_source = self.global().networking_task_source();
374                 fail_the_websocket_connection(address, &task_source, &self.global().task_canceller());
375             }
376             WebSocketRequestState::Open => {
377                 self.ready_state.set(WebSocketRequestState::Closing);
378 
379                 // Kick off _Start the WebSocket Closing Handshake_
380                 // https://tools.ietf.org/html/rfc6455#section-7.1.2
381                 let reason = reason.map(|reason| reason.0);
382                 let mut other_sender = self.sender.borrow_mut();
383                 let my_sender = other_sender.as_mut().unwrap();
384                 let _ = my_sender.send(WebSocketDomAction::Close(code, reason));
385             }
386         }
387         Ok(()) //Return Ok
388     }
389 }
390 
391 
392 /// Task queued when *the WebSocket connection is established*.
393 /// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established>
394 struct ConnectionEstablishedTask {
395     address: Trusted<WebSocket>,
396     protocol_in_use: Option<String>,
397 }
398 
399 impl TaskOnce for ConnectionEstablishedTask {
400     /// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established>
run_once(self)401     fn run_once(self) {
402         let ws = self.address.root();
403 
404         // Step 1.
405         ws.ready_state.set(WebSocketRequestState::Open);
406 
407         // Step 2: Extensions.
408         // TODO: Set extensions to extensions in use.
409 
410         // Step 3.
411         if let Some(protocol_name) = self.protocol_in_use {
412             *ws.protocol.borrow_mut() = protocol_name;
413         };
414 
415         // Step 4.
416         ws.upcast().fire_event(atom!("open"));
417     }
418 }
419 
420 struct BufferedAmountTask {
421     address: Trusted<WebSocket>,
422 }
423 
424 impl TaskOnce for BufferedAmountTask {
425     // See https://html.spec.whatwg.org/multipage/#dom-websocket-bufferedamount
426     //
427     // To be compliant with standards, we need to reset bufferedAmount only when the event loop
428     // reaches step 1.  In our implementation, the bytes will already have been sent on a background
429     // thread.
run_once(self)430     fn run_once(self) {
431         let ws = self.address.root();
432 
433         ws.buffered_amount.set(0);
434         ws.clearing_buffer.set(false);
435     }
436 }
437 
438 struct CloseTask {
439     address: Trusted<WebSocket>,
440     failed: bool,
441     code: Option<u16>,
442     reason: Option<String>,
443 }
444 
445 impl TaskOnce for CloseTask {
run_once(self)446     fn run_once(self) {
447         let ws = self.address.root();
448 
449         if ws.ready_state.get() == WebSocketRequestState::Closed {
450             // Do nothing if already closed.
451             return;
452         }
453 
454         // Perform _the WebSocket connection is closed_ steps.
455         // https://html.spec.whatwg.org/multipage/#closeWebSocket
456 
457         // Step 1.
458         ws.ready_state.set(WebSocketRequestState::Closed);
459 
460         // Step 2.
461         if self.failed {
462             ws.upcast().fire_event(atom!("error"));
463         }
464 
465         // Step 3.
466         let clean_close = !self.failed;
467         let code = self.code.unwrap_or(close_code::NO_STATUS);
468         let reason = DOMString::from(self.reason.unwrap_or("".to_owned()));
469         let close_event = CloseEvent::new(&ws.global(),
470                                           atom!("close"),
471                                           EventBubbles::DoesNotBubble,
472                                           EventCancelable::NotCancelable,
473                                           clean_close,
474                                           code,
475                                           reason);
476         close_event.upcast::<Event>().fire(ws.upcast());
477     }
478 }
479 
480 struct MessageReceivedTask {
481     address: Trusted<WebSocket>,
482     message: MessageData,
483 }
484 
485 impl TaskOnce for MessageReceivedTask {
486     #[allow(unsafe_code)]
run_once(self)487     fn run_once(self) {
488         let ws = self.address.root();
489         debug!("MessageReceivedTask::handler({:p}): readyState={:?}", &*ws,
490                ws.ready_state.get());
491 
492         // Step 1.
493         if ws.ready_state.get() != WebSocketRequestState::Open {
494             return;
495         }
496 
497         // Step 2-5.
498         let global = ws.global();
499         // global.get_cx() returns a valid `JSContext` pointer, so this is safe.
500         unsafe {
501             let cx = global.get_cx();
502             let _ac = JSAutoCompartment::new(cx, ws.reflector().get_jsobject().get());
503             rooted!(in(cx) let mut message = UndefinedValue());
504             match self.message {
505                 MessageData::Text(text) => text.to_jsval(cx, message.handle_mut()),
506                 MessageData::Binary(data) => {
507                     match ws.binary_type.get() {
508                         BinaryType::Blob => {
509                             let blob = Blob::new(&global, BlobImpl::new_from_bytes(data), "".to_owned());
510                             blob.to_jsval(cx, message.handle_mut());
511                         }
512                         BinaryType::Arraybuffer => {
513                             rooted!(in(cx) let mut array_buffer = ptr::null_mut::<JSObject>());
514                             assert!(ArrayBuffer::create(cx,
515                                                         CreateWith::Slice(&data),
516                                                         array_buffer.handle_mut())
517                                     .is_ok());
518 
519                             (*array_buffer).to_jsval(cx, message.handle_mut());
520                         }
521 
522                     }
523                 },
524             }
525             MessageEvent::dispatch_jsval(ws.upcast(), &global, message.handle());
526         }
527     }
528 }
529