1 /* This Source Code Form is subject to the terms of the Mozilla Public
2 * License, v. 2.0. If a copy of the MPL was not distributed with this
3 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
4
5 use dom::bindings::cell::DomRefCell;
6 use dom::bindings::codegen::Bindings::BlobBinding::BlobMethods;
7 use dom::bindings::codegen::Bindings::WebSocketBinding;
8 use dom::bindings::codegen::Bindings::WebSocketBinding::{BinaryType, WebSocketMethods};
9 use dom::bindings::codegen::UnionTypes::StringOrStringSequence;
10 use dom::bindings::conversions::ToJSValConvertible;
11 use dom::bindings::error::{Error, ErrorResult, Fallible};
12 use dom::bindings::inheritance::Castable;
13 use dom::bindings::refcounted::Trusted;
14 use dom::bindings::reflector::{DomObject, reflect_dom_object};
15 use dom::bindings::root::DomRoot;
16 use dom::bindings::str::{DOMString, USVString, is_token};
17 use dom::blob::{Blob, BlobImpl};
18 use dom::closeevent::CloseEvent;
19 use dom::event::{Event, EventBubbles, EventCancelable};
20 use dom::eventtarget::EventTarget;
21 use dom::globalscope::GlobalScope;
22 use dom::messageevent::MessageEvent;
23 use dom_struct::dom_struct;
24 use ipc_channel::ipc::{self, IpcReceiver, IpcSender};
25 use js::jsapi::{JSAutoCompartment, JSObject};
26 use js::jsval::UndefinedValue;
27 use js::typedarray::{ArrayBuffer, CreateWith};
28 use net_traits::{CoreResourceMsg, FetchChannels};
29 use net_traits::{WebSocketDomAction, WebSocketNetworkEvent};
30 use net_traits::MessageData;
31 use net_traits::request::{RequestInit, RequestMode};
32 use script_runtime::CommonScriptMsg;
33 use script_runtime::ScriptThreadEventCategory::WebSocketEvent;
34 use servo_url::ServoUrl;
35 use std::borrow::ToOwned;
36 use std::cell::Cell;
37 use std::ptr;
38 use std::thread;
39 use task::{TaskOnce, TaskCanceller};
40 use task_source::TaskSource;
41 use task_source::networking::NetworkingTaskSource;
42
43 #[derive(Clone, Copy, Debug, JSTraceable, MallocSizeOf, PartialEq)]
44 enum WebSocketRequestState {
45 Connecting = 0,
46 Open = 1,
47 Closing = 2,
48 Closed = 3,
49 }
50
51 // Close codes defined in https://tools.ietf.org/html/rfc6455#section-7.4.1
52 // Names are from https://github.com/mozilla/gecko-dev/blob/master/netwerk/protocol/websocket/nsIWebSocketChannel.idl
53 #[allow(dead_code)]
54 mod close_code {
55 pub const NORMAL: u16 = 1000;
56 pub const GOING_AWAY: u16 = 1001;
57 pub const PROTOCOL_ERROR: u16 = 1002;
58 pub const UNSUPPORTED_DATATYPE: u16 = 1003;
59 pub const NO_STATUS: u16 = 1005;
60 pub const ABNORMAL: u16 = 1006;
61 pub const INVALID_PAYLOAD: u16 = 1007;
62 pub const POLICY_VIOLATION: u16 = 1008;
63 pub const TOO_LARGE: u16 = 1009;
64 pub const EXTENSION_MISSING: u16 = 1010;
65 pub const INTERNAL_ERROR: u16 = 1011;
66 pub const TLS_FAILED: u16 = 1015;
67 }
68
close_the_websocket_connection( address: Trusted<WebSocket>, task_source: &NetworkingTaskSource, canceller: &TaskCanceller, code: Option<u16>, reason: String, )69 pub fn close_the_websocket_connection(
70 address: Trusted<WebSocket>,
71 task_source: &NetworkingTaskSource,
72 canceller: &TaskCanceller,
73 code: Option<u16>,
74 reason: String,
75 ) {
76 let close_task = CloseTask {
77 address: address,
78 failed: false,
79 code: code,
80 reason: Some(reason),
81 };
82 task_source.queue_with_canceller(close_task, &canceller).unwrap();
83 }
84
fail_the_websocket_connection( address: Trusted<WebSocket>, task_source: &NetworkingTaskSource, canceller: &TaskCanceller, )85 pub fn fail_the_websocket_connection(
86 address: Trusted<WebSocket>,
87 task_source: &NetworkingTaskSource,
88 canceller: &TaskCanceller,
89 ) {
90 let close_task = CloseTask {
91 address: address,
92 failed: true,
93 code: Some(close_code::ABNORMAL),
94 reason: None,
95 };
96 task_source.queue_with_canceller(close_task, &canceller).unwrap();
97 }
98
99 #[dom_struct]
100 pub struct WebSocket {
101 eventtarget: EventTarget,
102 url: ServoUrl,
103 ready_state: Cell<WebSocketRequestState>,
104 buffered_amount: Cell<u64>,
105 clearing_buffer: Cell<bool>, //Flag to tell if there is a running thread to clear buffered_amount
106 #[ignore_malloc_size_of = "Defined in std"]
107 sender: DomRefCell<Option<IpcSender<WebSocketDomAction>>>,
108 binary_type: Cell<BinaryType>,
109 protocol: DomRefCell<String>, //Subprotocol selected by server
110 }
111
112 impl WebSocket {
new_inherited(url: ServoUrl) -> WebSocket113 fn new_inherited(url: ServoUrl) -> WebSocket {
114 WebSocket {
115 eventtarget: EventTarget::new_inherited(),
116 url: url,
117 ready_state: Cell::new(WebSocketRequestState::Connecting),
118 buffered_amount: Cell::new(0),
119 clearing_buffer: Cell::new(false),
120 sender: DomRefCell::new(None),
121 binary_type: Cell::new(BinaryType::Blob),
122 protocol: DomRefCell::new("".to_owned()),
123 }
124 }
125
new(global: &GlobalScope, url: ServoUrl) -> DomRoot<WebSocket>126 fn new(global: &GlobalScope, url: ServoUrl) -> DomRoot<WebSocket> {
127 reflect_dom_object(Box::new(WebSocket::new_inherited(url)),
128 global, WebSocketBinding::Wrap)
129 }
130
131 /// <https://html.spec.whatwg.org/multipage/#dom-websocket>
Constructor(global: &GlobalScope, url: DOMString, protocols: Option<StringOrStringSequence>) -> Fallible<DomRoot<WebSocket>>132 pub fn Constructor(global: &GlobalScope,
133 url: DOMString,
134 protocols: Option<StringOrStringSequence>)
135 -> Fallible<DomRoot<WebSocket>> {
136 // Steps 1-2.
137 let url_record = ServoUrl::parse(&url).or(Err(Error::Syntax))?;
138
139 // Step 3.
140 match url_record.scheme() {
141 "ws" | "wss" => {},
142 _ => return Err(Error::Syntax),
143 }
144
145 // Step 4.
146 if url_record.fragment().is_some() {
147 return Err(Error::Syntax);
148 }
149
150 // Step 5.
151 let protocols = protocols.map_or(vec![], |p| {
152 match p {
153 StringOrStringSequence::String(string) => vec![string.into()],
154 StringOrStringSequence::StringSequence(seq) => {
155 seq.into_iter().map(String::from).collect()
156 },
157 }
158 });
159
160 // Step 6.
161 for (i, protocol) in protocols.iter().enumerate() {
162 // https://tools.ietf.org/html/rfc6455#section-4.1
163 // Handshake requirements, step 10
164
165 if protocols[i + 1..].iter().any(|p| p.eq_ignore_ascii_case(protocol)) {
166 return Err(Error::Syntax);
167 }
168
169 // https://tools.ietf.org/html/rfc6455#section-4.1
170 if !is_token(protocol.as_bytes()) {
171 return Err(Error::Syntax);
172 }
173 }
174
175 let ws = WebSocket::new(global, url_record.clone());
176 let address = Trusted::new(&*ws);
177
178 // Create the interface for communication with the resource thread
179 let (dom_action_sender, resource_action_receiver):
180 (IpcSender<WebSocketDomAction>,
181 IpcReceiver<WebSocketDomAction>) = ipc::channel().unwrap();
182 let (resource_event_sender, dom_event_receiver):
183 (IpcSender<WebSocketNetworkEvent>,
184 IpcReceiver<WebSocketNetworkEvent>) = ipc::channel().unwrap();
185
186 // Step 8.
187 let request = RequestInit {
188 url: url_record,
189 origin: global.origin().immutable().clone(),
190 mode: RequestMode::WebSocket { protocols },
191 ..RequestInit::default()
192 };
193 let channels = FetchChannels::WebSocket {
194 event_sender: resource_event_sender,
195 action_receiver: resource_action_receiver,
196 };
197 let _ = global.core_resource_thread().send(CoreResourceMsg::Fetch(request, channels));
198
199 *ws.sender.borrow_mut() = Some(dom_action_sender);
200
201 let task_source = global.networking_task_source();
202 let canceller = global.task_canceller();
203 thread::spawn(move || {
204 while let Ok(event) = dom_event_receiver.recv() {
205 match event {
206 WebSocketNetworkEvent::ConnectionEstablished { protocol_in_use } => {
207 let open_thread = ConnectionEstablishedTask {
208 address: address.clone(),
209 protocol_in_use,
210 };
211 task_source.queue_with_canceller(open_thread, &canceller).unwrap();
212 },
213 WebSocketNetworkEvent::MessageReceived(message) => {
214 let message_thread = MessageReceivedTask {
215 address: address.clone(),
216 message: message,
217 };
218 task_source.queue_with_canceller(message_thread, &canceller).unwrap();
219 },
220 WebSocketNetworkEvent::Fail => {
221 fail_the_websocket_connection(address.clone(),
222 &task_source, &canceller);
223 },
224 WebSocketNetworkEvent::Close(code, reason) => {
225 close_the_websocket_connection(address.clone(),
226 &task_source, &canceller, code, reason);
227 },
228 }
229 }
230 });
231
232 // Step 7.
233 Ok(ws)
234 }
235
236 // https://html.spec.whatwg.org/multipage/#dom-websocket-send
send_impl(&self, data_byte_len: u64) -> Fallible<bool>237 fn send_impl(&self, data_byte_len: u64) -> Fallible<bool> {
238 let return_after_buffer = match self.ready_state.get() {
239 WebSocketRequestState::Connecting => {
240 return Err(Error::InvalidState);
241 },
242 WebSocketRequestState::Open => false,
243 WebSocketRequestState::Closing | WebSocketRequestState::Closed => true,
244 };
245
246 let address = Trusted::new(self);
247
248 match data_byte_len.checked_add(self.buffered_amount.get()) {
249 None => panic!(),
250 Some(new_amount) => self.buffered_amount.set(new_amount)
251 };
252
253 if return_after_buffer {
254 return Ok(false);
255 }
256
257 if !self.clearing_buffer.get() && self.ready_state.get() == WebSocketRequestState::Open {
258 self.clearing_buffer.set(true);
259
260 let task = Box::new(BufferedAmountTask {
261 address: address,
262 });
263
264 let pipeline_id = self.global().pipeline_id();
265 self.global()
266 .script_chan()
267 .send(CommonScriptMsg::Task(WebSocketEvent, task, Some(pipeline_id)))
268 .unwrap();
269 }
270
271 Ok(true)
272 }
273 }
274
275 impl WebSocketMethods for WebSocket {
276 // https://html.spec.whatwg.org/multipage/#handler-websocket-onopen
277 event_handler!(open, GetOnopen, SetOnopen);
278
279 // https://html.spec.whatwg.org/multipage/#handler-websocket-onclose
280 event_handler!(close, GetOnclose, SetOnclose);
281
282 // https://html.spec.whatwg.org/multipage/#handler-websocket-onerror
283 event_handler!(error, GetOnerror, SetOnerror);
284
285 // https://html.spec.whatwg.org/multipage/#handler-websocket-onmessage
286 event_handler!(message, GetOnmessage, SetOnmessage);
287
288 // https://html.spec.whatwg.org/multipage/#dom-websocket-url
Url(&self) -> DOMString289 fn Url(&self) -> DOMString {
290 DOMString::from(self.url.as_str())
291 }
292
293 // https://html.spec.whatwg.org/multipage/#dom-websocket-readystate
ReadyState(&self) -> u16294 fn ReadyState(&self) -> u16 {
295 self.ready_state.get() as u16
296 }
297
298 // https://html.spec.whatwg.org/multipage/#dom-websocket-bufferedamount
BufferedAmount(&self) -> u64299 fn BufferedAmount(&self) -> u64 {
300 self.buffered_amount.get()
301 }
302
303 // https://html.spec.whatwg.org/multipage/#dom-websocket-binarytype
BinaryType(&self) -> BinaryType304 fn BinaryType(&self) -> BinaryType {
305 self.binary_type.get()
306 }
307
308 // https://html.spec.whatwg.org/multipage/#dom-websocket-binarytype
SetBinaryType(&self, btype: BinaryType)309 fn SetBinaryType(&self, btype: BinaryType) {
310 self.binary_type.set(btype)
311 }
312
313 // https://html.spec.whatwg.org/multipage/#dom-websocket-protocol
Protocol(&self) -> DOMString314 fn Protocol(&self) -> DOMString {
315 DOMString::from(self.protocol.borrow().clone())
316 }
317
318 // https://html.spec.whatwg.org/multipage/#dom-websocket-send
Send(&self, data: USVString) -> ErrorResult319 fn Send(&self, data: USVString) -> ErrorResult {
320 let data_byte_len = data.0.as_bytes().len() as u64;
321 let send_data = self.send_impl(data_byte_len)?;
322
323 if send_data {
324 let mut other_sender = self.sender.borrow_mut();
325 let my_sender = other_sender.as_mut().unwrap();
326 let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Text(data.0)));
327 }
328
329 Ok(())
330 }
331
332 // https://html.spec.whatwg.org/multipage/#dom-websocket-send
Send_(&self, blob: &Blob) -> ErrorResult333 fn Send_(&self, blob: &Blob) -> ErrorResult {
334 /* As per https://html.spec.whatwg.org/multipage/#websocket
335 the buffered amount needs to be clamped to u32, even though Blob.Size() is u64
336 If the buffer limit is reached in the first place, there are likely other major problems
337 */
338 let data_byte_len = blob.Size();
339 let send_data = self.send_impl(data_byte_len)?;
340
341 if send_data {
342 let mut other_sender = self.sender.borrow_mut();
343 let my_sender = other_sender.as_mut().unwrap();
344 let bytes = blob.get_bytes().unwrap_or(vec![]);
345 let _ = my_sender.send(WebSocketDomAction::SendMessage(MessageData::Binary(bytes)));
346 }
347
348 Ok(())
349 }
350
351 // https://html.spec.whatwg.org/multipage/#dom-websocket-close
Close(&self, code: Option<u16>, reason: Option<USVString>) -> ErrorResult352 fn Close(&self, code: Option<u16>, reason: Option<USVString>) -> ErrorResult {
353 if let Some(code) = code {
354 //Fail if the supplied code isn't normal and isn't reserved for libraries, frameworks, and applications
355 if code != close_code::NORMAL && (code < 3000 || code > 4999) {
356 return Err(Error::InvalidAccess);
357 }
358 }
359 if let Some(ref reason) = reason {
360 if reason.0.as_bytes().len() > 123 { //reason cannot be larger than 123 bytes
361 return Err(Error::Syntax);
362 }
363 }
364
365 match self.ready_state.get() {
366 WebSocketRequestState::Closing | WebSocketRequestState::Closed => {} //Do nothing
367 WebSocketRequestState::Connecting => { //Connection is not yet established
368 /*By setting the state to closing, the open function
369 will abort connecting the websocket*/
370 self.ready_state.set(WebSocketRequestState::Closing);
371
372 let address = Trusted::new(self);
373 let task_source = self.global().networking_task_source();
374 fail_the_websocket_connection(address, &task_source, &self.global().task_canceller());
375 }
376 WebSocketRequestState::Open => {
377 self.ready_state.set(WebSocketRequestState::Closing);
378
379 // Kick off _Start the WebSocket Closing Handshake_
380 // https://tools.ietf.org/html/rfc6455#section-7.1.2
381 let reason = reason.map(|reason| reason.0);
382 let mut other_sender = self.sender.borrow_mut();
383 let my_sender = other_sender.as_mut().unwrap();
384 let _ = my_sender.send(WebSocketDomAction::Close(code, reason));
385 }
386 }
387 Ok(()) //Return Ok
388 }
389 }
390
391
392 /// Task queued when *the WebSocket connection is established*.
393 /// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established>
394 struct ConnectionEstablishedTask {
395 address: Trusted<WebSocket>,
396 protocol_in_use: Option<String>,
397 }
398
399 impl TaskOnce for ConnectionEstablishedTask {
400 /// <https://html.spec.whatwg.org/multipage/#feedback-from-the-protocol:concept-websocket-established>
run_once(self)401 fn run_once(self) {
402 let ws = self.address.root();
403
404 // Step 1.
405 ws.ready_state.set(WebSocketRequestState::Open);
406
407 // Step 2: Extensions.
408 // TODO: Set extensions to extensions in use.
409
410 // Step 3.
411 if let Some(protocol_name) = self.protocol_in_use {
412 *ws.protocol.borrow_mut() = protocol_name;
413 };
414
415 // Step 4.
416 ws.upcast().fire_event(atom!("open"));
417 }
418 }
419
420 struct BufferedAmountTask {
421 address: Trusted<WebSocket>,
422 }
423
424 impl TaskOnce for BufferedAmountTask {
425 // See https://html.spec.whatwg.org/multipage/#dom-websocket-bufferedamount
426 //
427 // To be compliant with standards, we need to reset bufferedAmount only when the event loop
428 // reaches step 1. In our implementation, the bytes will already have been sent on a background
429 // thread.
run_once(self)430 fn run_once(self) {
431 let ws = self.address.root();
432
433 ws.buffered_amount.set(0);
434 ws.clearing_buffer.set(false);
435 }
436 }
437
438 struct CloseTask {
439 address: Trusted<WebSocket>,
440 failed: bool,
441 code: Option<u16>,
442 reason: Option<String>,
443 }
444
445 impl TaskOnce for CloseTask {
run_once(self)446 fn run_once(self) {
447 let ws = self.address.root();
448
449 if ws.ready_state.get() == WebSocketRequestState::Closed {
450 // Do nothing if already closed.
451 return;
452 }
453
454 // Perform _the WebSocket connection is closed_ steps.
455 // https://html.spec.whatwg.org/multipage/#closeWebSocket
456
457 // Step 1.
458 ws.ready_state.set(WebSocketRequestState::Closed);
459
460 // Step 2.
461 if self.failed {
462 ws.upcast().fire_event(atom!("error"));
463 }
464
465 // Step 3.
466 let clean_close = !self.failed;
467 let code = self.code.unwrap_or(close_code::NO_STATUS);
468 let reason = DOMString::from(self.reason.unwrap_or("".to_owned()));
469 let close_event = CloseEvent::new(&ws.global(),
470 atom!("close"),
471 EventBubbles::DoesNotBubble,
472 EventCancelable::NotCancelable,
473 clean_close,
474 code,
475 reason);
476 close_event.upcast::<Event>().fire(ws.upcast());
477 }
478 }
479
480 struct MessageReceivedTask {
481 address: Trusted<WebSocket>,
482 message: MessageData,
483 }
484
485 impl TaskOnce for MessageReceivedTask {
486 #[allow(unsafe_code)]
run_once(self)487 fn run_once(self) {
488 let ws = self.address.root();
489 debug!("MessageReceivedTask::handler({:p}): readyState={:?}", &*ws,
490 ws.ready_state.get());
491
492 // Step 1.
493 if ws.ready_state.get() != WebSocketRequestState::Open {
494 return;
495 }
496
497 // Step 2-5.
498 let global = ws.global();
499 // global.get_cx() returns a valid `JSContext` pointer, so this is safe.
500 unsafe {
501 let cx = global.get_cx();
502 let _ac = JSAutoCompartment::new(cx, ws.reflector().get_jsobject().get());
503 rooted!(in(cx) let mut message = UndefinedValue());
504 match self.message {
505 MessageData::Text(text) => text.to_jsval(cx, message.handle_mut()),
506 MessageData::Binary(data) => {
507 match ws.binary_type.get() {
508 BinaryType::Blob => {
509 let blob = Blob::new(&global, BlobImpl::new_from_bytes(data), "".to_owned());
510 blob.to_jsval(cx, message.handle_mut());
511 }
512 BinaryType::Arraybuffer => {
513 rooted!(in(cx) let mut array_buffer = ptr::null_mut::<JSObject>());
514 assert!(ArrayBuffer::create(cx,
515 CreateWith::Slice(&data),
516 array_buffer.handle_mut())
517 .is_ok());
518
519 (*array_buffer).to_jsval(cx, message.handle_mut());
520 }
521
522 }
523 },
524 }
525 MessageEvent::dispatch_jsval(ws.upcast(), &global, message.handle());
526 }
527 }
528 }
529