1package Mojo::Transaction::WebSocket; 2use Mojo::Base 'Mojo::Transaction'; 3 4use Compress::Raw::Zlib qw(Z_SYNC_FLUSH); 5use List::Util qw(first); 6use Mojo::JSON qw(encode_json j); 7use Mojo::Util qw(decode encode trim); 8use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT); 9 10has [qw(compressed established handshake masked)]; 11has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 }; 12 13sub build_message { 14 my ($self, $frame) = @_; 15 16 # Text 17 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH'; 18 19 # JSON 20 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json}; 21 22 # Raw text or binary 23 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] } 24 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] } 25 26 # "permessage-deflate" extension 27 return $frame unless $self->compressed; 28 my $deflate = $self->{deflate} 29 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15); 30 $deflate->deflate($frame->[5], my $out); 31 $deflate->flush($out, Z_SYNC_FLUSH); 32 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4)); 33 34 return $frame; 35} 36 37sub client_read { shift->server_read(@_) } 38sub client_write { shift->server_write(@_) } 39 40sub closed { 41 my $self = shift->completed; 42 my @args = $self->{close} ? (@{$self->{close}}) : (1006); 43 return $self->emit(finish => @args > 1 ? @args : (@args, undef)); 44} 45 46sub connection { shift->handshake->connection } 47 48sub finish { 49 my $self = shift; 50 51 my $close = $self->{close} = [@_]; 52 my $payload = $close->[0] ? pack('n', $close->[0]) : ''; 53 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1]; 54 $close->[0] //= 1005; 55 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1; 56 57 return $self; 58} 59 60sub is_websocket {1} 61 62sub kept_alive { shift->handshake->kept_alive } 63sub local_address { shift->handshake->local_address } 64sub local_port { shift->handshake->local_port } 65 66sub parse_message { 67 my ($self, $frame) = @_; 68 69 $self->emit(frame => $frame); 70 71 # Ping/Pong 72 my $op = $frame->[4]; 73 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING; 74 return undef if $op == WS_PONG; 75 76 # Close 77 if ($op == WS_CLOSE) { 78 return $self->finish unless length $frame->[5] >= 2; 79 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5])); 80 } 81 82 # Append chunk and check message size 83 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op}; 84 $self->{message} .= $frame->[5]; 85 my $max = $self->max_websocket_size; 86 return $self->finish(1009) if length $self->{message} > $max; 87 88 # No FIN bit (Continuation) 89 return undef unless $frame->[0]; 90 91 # "permessage-deflate" extension (handshake and RSV1) 92 my $msg = delete $self->{message}; 93 if ($self->compressed && $self->{pmc}) { 94 my $inflate = $self->{inflate} 95 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15); 96 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out); 97 return $self->finish(1009) if length $msg; 98 $msg = $out; 99 } 100 101 $self->emit(json => j($msg)) if $self->has_subscribers('json'); 102 $op = delete $self->{op}; 103 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg); 104 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message'); 105} 106 107sub protocol { shift->res->headers->sec_websocket_protocol } 108 109sub remote_address { shift->handshake->remote_address } 110sub remote_port { shift->handshake->remote_port } 111sub req { shift->handshake->req } 112sub res { shift->handshake->res } 113 114sub resume { $_[0]->handshake->resume and return $_[0] } 115 116sub send { 117 my ($self, $msg, $cb) = @_; 118 $self->once(drain => $cb) if $cb; 119 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY'; 120 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg); 121 return $self->emit('resume'); 122} 123 124sub server_read { 125 my ($self, $chunk) = @_; 126 127 $self->{read} .= $chunk; 128 my $max = $self->max_websocket_size; 129 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) { 130 $self->finish(1009) and last unless ref $frame; 131 $self->parse_message($frame); 132 } 133 134 $self->emit('resume'); 135} 136 137sub server_write { 138 my $self = shift; 139 $self->emit('drain') unless length($self->{write} //= ''); 140 $self->completed if !length $self->{write} && $self->{closing}; 141 return delete $self->{write}; 142} 143 144sub with_compression { 145 my $self = shift; 146 147 # "permessage-deflate" extension 148 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate') 149 if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/; 150} 151 152sub with_protocols { 153 my $self = shift; 154 155 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // ''; 156 return undef unless defined(my $proto = first { $protos{$_} } @_); 157 158 $self->res->headers->sec_websocket_protocol($proto); 159 return $proto; 160} 161 1621; 163 164=encoding utf8 165 166=head1 NAME 167 168Mojo::Transaction::WebSocket - WebSocket transaction 169 170=head1 SYNOPSIS 171 172 use Mojo::Transaction::WebSocket; 173 174 # Send and receive WebSocket messages 175 my $ws = Mojo::Transaction::WebSocket->new; 176 $ws->send('Hello World!'); 177 $ws->on(message => sub ($ws, $msg) { say "Message: $msg" }); 178 $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." }); 179 180=head1 DESCRIPTION 181 182L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions, based on L<RFC 1836455|https://tools.ietf.org/html/rfc6455> and L<RFC 7692|https://tools.ietf.org/html/rfc7692>. 184 185=head1 EVENTS 186 187L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction> and can emit the following new ones. 188 189=head2 binary 190 191 $ws->on(binary => sub ($ws, $bytes) {...}); 192 193Emitted when a complete WebSocket binary message has been received. 194 195 $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" }); 196 197=head2 drain 198 199 $ws->on(drain => sub ($ws) {...}); 200 201Emitted once all data has been sent. 202 203 $ws->on(drain => sub ($ws) { $ws->send(time) }); 204 205=head2 finish 206 207 $ws->on(finish => sub ($ws, $code, $reason) {...}); 208 209Emitted when the WebSocket connection has been closed. 210 211=head2 frame 212 213 $ws->on(frame => sub ($ws, $frame) {...}); 214 215Emitted when a WebSocket frame has been received. 216 217 $ws->on(frame => sub ($ws, $frame) { 218 say "FIN: $frame->[0]"; 219 say "RSV1: $frame->[1]"; 220 say "RSV2: $frame->[2]"; 221 say "RSV3: $frame->[3]"; 222 say "Opcode: $frame->[4]"; 223 say "Payload: $frame->[5]"; 224 }); 225 226=head2 json 227 228 $ws->on(json => sub ($ws, $json) {...}); 229 230Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON 231decoded. Note that this event only gets emitted when it has at least one subscriber. 232 233 $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" }); 234 235=head2 message 236 237 $ws->on(message => sub ($ws, $msg) {...}); 238 239Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that 240this event only gets emitted when it has at least one subscriber. 241 242 $ws->on(message => sub ($ws, $msg) { say "Message: $msg" }); 243 244=head2 resume 245 246 $tx->on(resume => sub ($tx) {...}); 247 248Emitted when transaction is resumed. 249 250=head2 text 251 252 $ws->on(text => sub ($ws, $bytes) {...}); 253 254Emitted when a complete WebSocket text message has been received. 255 256 $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" }); 257 258=head1 ATTRIBUTES 259 260L<Mojo::Transaction::WebSocket> inherits all attributes from L<Mojo::Transaction> and implements the following new 261ones. 262 263=head2 compressed 264 265 my $bool = $ws->compressed; 266 $ws = $ws->compressed($bool); 267 268Compress messages with C<permessage-deflate> extension. 269 270=head2 established 271 272 my $bool = $ws->established; 273 $ws = $ws->established($bool); 274 275WebSocket connection established. 276 277=head2 handshake 278 279 my $handshake = $ws->handshake; 280 $ws = $ws->handshake(Mojo::Transaction::HTTP->new); 281 282The original handshake transaction, usually a L<Mojo::Transaction::HTTP> object. 283 284=head2 masked 285 286 my $bool = $ws->masked; 287 $ws = $ws->masked($bool); 288 289Mask outgoing frames with XOR cipher and a random 32-bit key. 290 291=head2 max_websocket_size 292 293 my $size = $ws->max_websocket_size; 294 $ws = $ws->max_websocket_size(1024); 295 296Maximum WebSocket message size in bytes, defaults to the value of the C<MOJO_MAX_WEBSOCKET_SIZE> environment variable 297or C<262144> (256KiB). 298 299=head1 METHODS 300 301L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction> and implements the following new ones. 302 303=head2 build_message 304 305 my $frame = $ws->build_message({binary => $bytes}); 306 my $frame = $ws->build_message({text => $bytes}); 307 my $frame = $ws->build_message({json => {test => [1, 2, 3]}}); 308 my $frame = $ws->build_message($chars); 309 310Build WebSocket message. 311 312=head2 client_read 313 314 $ws->client_read($data); 315 316Read data client-side, used to implement user agents such as L<Mojo::UserAgent>. 317 318=head2 client_write 319 320 my $bytes = $ws->client_write; 321 322Write data client-side, used to implement user agents such as L<Mojo::UserAgent>. 323 324=head2 closed 325 326 $tx = $tx->closed; 327 328Same as L<Mojo::Transaction/"completed">, but also indicates that all transaction data has been sent. 329 330=head2 connection 331 332 my $id = $ws->connection; 333 334Connection identifier. 335 336=head2 finish 337 338 $ws = $ws->finish; 339 $ws = $ws->finish(1000); 340 $ws = $ws->finish(1003 => 'Cannot accept data!'); 341 342Close WebSocket connection gracefully. 343 344=head2 is_websocket 345 346 my $bool = $ws->is_websocket; 347 348True, this is a L<Mojo::Transaction::WebSocket> object. 349 350=head2 kept_alive 351 352 my $bool = $ws->kept_alive; 353 354Connection has been kept alive. 355 356=head2 local_address 357 358 my $address = $ws->local_address; 359 360Local interface address. 361 362=head2 local_port 363 364 my $port = $ws->local_port; 365 366Local interface port. 367 368=head2 parse_message 369 370 $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]); 371 372Parse WebSocket message. 373 374=head2 protocol 375 376 my $proto = $ws->protocol; 377 378Return negotiated subprotocol or C<undef>. 379 380=head2 remote_address 381 382 my $address = $ws->remote_address; 383 384Remote interface address. 385 386=head2 remote_port 387 388 my $port = $ws->remote_port; 389 390Remote interface port. 391 392=head2 req 393 394 my $req = $ws->req; 395 396Handshake request, usually a L<Mojo::Message::Request> object. 397 398=head2 res 399 400 my $res = $ws->res; 401 402Handshake response, usually a L<Mojo::Message::Response> object. 403 404=head2 resume 405 406 $ws = $ws->resume; 407 408Resume L</"handshake"> transaction. 409 410=head2 send 411 412 $ws = $ws->send({binary => $bytes}); 413 $ws = $ws->send({text => $bytes}); 414 $ws = $ws->send({json => {test => [1, 2, 3]}}); 415 $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]); 416 $ws = $ws->send($chars); 417 $ws = $ws->send($chars => sub {...}); 418 419Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been 420written. 421 422 # Send "Ping" frame 423 use Mojo::WebSocket qw(WS_PING); 424 $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']); 425 426=head2 server_read 427 428 $ws->server_read($data); 429 430Read data server-side, used to implement web servers such as L<Mojo::Server::Daemon>. 431 432=head2 server_write 433 434 my $bytes = $ws->server_write; 435 436Write data server-side, used to implement web servers such as L<Mojo::Server::Daemon>. 437 438=head2 with_compression 439 440 $ws->with_compression; 441 442Negotiate C<permessage-deflate> extension for this WebSocket connection. 443 444=head2 with_protocols 445 446 my $proto = $ws->with_protocols('v2.proto', 'v1.proto'); 447 448Negotiate subprotocol for this WebSocket connection. 449 450=head1 SEE ALSO 451 452L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>. 453 454=cut 455