1package Mojo::Transaction::WebSocket;
2use Mojo::Base 'Mojo::Transaction';
3
4use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
5use List::Util qw(first);
6use Mojo::JSON qw(encode_json j);
7use Mojo::Util qw(decode encode trim);
8use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
9
10has [qw(compressed established handshake masked)];
11has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12
13sub build_message {
14  my ($self, $frame) = @_;
15
16  # Text
17  $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18
19  # JSON
20  $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21
22  # Raw text or binary
23  if   (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT,   $frame->{text}] }
24  else                         { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25
26  # "permessage-deflate" extension
27  return $frame unless $self->compressed;
28  my $deflate = $self->{deflate}
29    ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30  $deflate->deflate($frame->[5], my $out);
31  $deflate->flush($out, Z_SYNC_FLUSH);
32  @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
33
34  return $frame;
35}
36
37sub client_read  { shift->server_read(@_) }
38sub client_write { shift->server_write(@_) }
39
40sub closed {
41  my $self = shift->completed;
42  my @args = $self->{close} ? (@{$self->{close}}) : (1006);
43  return $self->emit(finish => @args > 1 ? @args : (@args, undef));
44}
45
46sub connection { shift->handshake->connection }
47
48sub finish {
49  my $self = shift;
50
51  my $close   = $self->{close} = [@_];
52  my $payload = $close->[0] ? pack('n', $close->[0]) : '';
53  $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
54  $close->[0] //= 1005;
55  $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
56
57  return $self;
58}
59
60sub is_websocket {1}
61
62sub kept_alive    { shift->handshake->kept_alive }
63sub local_address { shift->handshake->local_address }
64sub local_port    { shift->handshake->local_port }
65
66sub parse_message {
67  my ($self, $frame) = @_;
68
69  $self->emit(frame => $frame);
70
71  # Ping/Pong
72  my $op = $frame->[4];
73  return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
74  return undef                                           if $op == WS_PONG;
75
76  # Close
77  if ($op == WS_CLOSE) {
78    return $self->finish unless length $frame->[5] >= 2;
79    return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
80  }
81
82  # Append chunk and check message size
83  @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
84  $self->{message} .= $frame->[5];
85  my $max = $self->max_websocket_size;
86  return $self->finish(1009) if length $self->{message} > $max;
87
88  # No FIN bit (Continuation)
89  return undef unless $frame->[0];
90
91  # "permessage-deflate" extension (handshake and RSV1)
92  my $msg = delete $self->{message};
93  if ($self->compressed && $self->{pmc}) {
94    my $inflate = $self->{inflate}
95      ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
96    $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
97    return $self->finish(1009) if length $msg;
98    $msg = $out;
99  }
100
101  $self->emit(json => j($msg)) if $self->has_subscribers('json');
102  $op = delete $self->{op};
103  $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
104  $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
105}
106
107sub protocol { shift->res->headers->sec_websocket_protocol }
108
109sub remote_address { shift->handshake->remote_address }
110sub remote_port    { shift->handshake->remote_port }
111sub req            { shift->handshake->req }
112sub res            { shift->handshake->res }
113
114sub resume { $_[0]->handshake->resume and return $_[0] }
115
116sub send {
117  my ($self, $msg, $cb) = @_;
118  $self->once(drain => $cb) if $cb;
119  $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
120  $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
121  return $self->emit('resume');
122}
123
124sub server_read {
125  my ($self, $chunk) = @_;
126
127  $self->{read} .= $chunk;
128  my $max = $self->max_websocket_size;
129  while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
130    $self->finish(1009) and last unless ref $frame;
131    $self->parse_message($frame);
132  }
133
134  $self->emit('resume');
135}
136
137sub server_write {
138  my $self = shift;
139  $self->emit('drain') unless length($self->{write} //= '');
140  $self->completed if !length $self->{write} && $self->{closing};
141  return delete $self->{write};
142}
143
144sub with_compression {
145  my $self = shift;
146
147  # "permessage-deflate" extension
148  $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
149    if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
150}
151
152sub with_protocols {
153  my $self = shift;
154
155  my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
156  return undef unless defined(my $proto = first { $protos{$_} } @_);
157
158  $self->res->headers->sec_websocket_protocol($proto);
159  return $proto;
160}
161
1621;
163
164=encoding utf8
165
166=head1 NAME
167
168Mojo::Transaction::WebSocket - WebSocket transaction
169
170=head1 SYNOPSIS
171
172  use Mojo::Transaction::WebSocket;
173
174  # Send and receive WebSocket messages
175  my $ws = Mojo::Transaction::WebSocket->new;
176  $ws->send('Hello World!');
177  $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
178  $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
179
180=head1 DESCRIPTION
181
182L<Mojo::Transaction::WebSocket> is a container for WebSocket transactions, based on L<RFC
1836455|https://tools.ietf.org/html/rfc6455> and L<RFC 7692|https://tools.ietf.org/html/rfc7692>.
184
185=head1 EVENTS
186
187L<Mojo::Transaction::WebSocket> inherits all events from L<Mojo::Transaction> and can emit the following new ones.
188
189=head2 binary
190
191  $ws->on(binary => sub ($ws, $bytes) {...});
192
193Emitted when a complete WebSocket binary message has been received.
194
195  $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
196
197=head2 drain
198
199  $ws->on(drain => sub ($ws) {...});
200
201Emitted once all data has been sent.
202
203  $ws->on(drain => sub ($ws) { $ws->send(time) });
204
205=head2 finish
206
207  $ws->on(finish => sub ($ws, $code, $reason) {...});
208
209Emitted when the WebSocket connection has been closed.
210
211=head2 frame
212
213  $ws->on(frame => sub ($ws, $frame) {...});
214
215Emitted when a WebSocket frame has been received.
216
217  $ws->on(frame => sub ($ws, $frame) {
218    say "FIN: $frame->[0]";
219    say "RSV1: $frame->[1]";
220    say "RSV2: $frame->[2]";
221    say "RSV3: $frame->[3]";
222    say "Opcode: $frame->[4]";
223    say "Payload: $frame->[5]";
224  });
225
226=head2 json
227
228  $ws->on(json => sub ($ws, $json) {...});
229
230Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
231decoded. Note that this event only gets emitted when it has at least one subscriber.
232
233  $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
234
235=head2 message
236
237  $ws->on(message => sub ($ws, $msg) {...});
238
239Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
240this event only gets emitted when it has at least one subscriber.
241
242  $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
243
244=head2 resume
245
246  $tx->on(resume => sub ($tx) {...});
247
248Emitted when transaction is resumed.
249
250=head2 text
251
252  $ws->on(text => sub ($ws, $bytes) {...});
253
254Emitted when a complete WebSocket text message has been received.
255
256  $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
257
258=head1 ATTRIBUTES
259
260L<Mojo::Transaction::WebSocket> inherits all attributes from L<Mojo::Transaction> and implements the following new
261ones.
262
263=head2 compressed
264
265  my $bool = $ws->compressed;
266  $ws      = $ws->compressed($bool);
267
268Compress messages with C<permessage-deflate> extension.
269
270=head2 established
271
272  my $bool = $ws->established;
273  $ws      = $ws->established($bool);
274
275WebSocket connection established.
276
277=head2 handshake
278
279  my $handshake = $ws->handshake;
280  $ws           = $ws->handshake(Mojo::Transaction::HTTP->new);
281
282The original handshake transaction, usually a L<Mojo::Transaction::HTTP> object.
283
284=head2 masked
285
286  my $bool = $ws->masked;
287  $ws      = $ws->masked($bool);
288
289Mask outgoing frames with XOR cipher and a random 32-bit key.
290
291=head2 max_websocket_size
292
293  my $size = $ws->max_websocket_size;
294  $ws      = $ws->max_websocket_size(1024);
295
296Maximum WebSocket message size in bytes, defaults to the value of the C<MOJO_MAX_WEBSOCKET_SIZE> environment variable
297or C<262144> (256KiB).
298
299=head1 METHODS
300
301L<Mojo::Transaction::WebSocket> inherits all methods from L<Mojo::Transaction> and implements the following new ones.
302
303=head2 build_message
304
305  my $frame = $ws->build_message({binary => $bytes});
306  my $frame = $ws->build_message({text   => $bytes});
307  my $frame = $ws->build_message({json   => {test => [1, 2, 3]}});
308  my $frame = $ws->build_message($chars);
309
310Build WebSocket message.
311
312=head2 client_read
313
314  $ws->client_read($data);
315
316Read data client-side, used to implement user agents such as L<Mojo::UserAgent>.
317
318=head2 client_write
319
320  my $bytes = $ws->client_write;
321
322Write data client-side, used to implement user agents such as L<Mojo::UserAgent>.
323
324=head2 closed
325
326  $tx = $tx->closed;
327
328Same as L<Mojo::Transaction/"completed">, but also indicates that all transaction data has been sent.
329
330=head2 connection
331
332  my $id = $ws->connection;
333
334Connection identifier.
335
336=head2 finish
337
338  $ws = $ws->finish;
339  $ws = $ws->finish(1000);
340  $ws = $ws->finish(1003 => 'Cannot accept data!');
341
342Close WebSocket connection gracefully.
343
344=head2 is_websocket
345
346  my $bool = $ws->is_websocket;
347
348True, this is a L<Mojo::Transaction::WebSocket> object.
349
350=head2 kept_alive
351
352  my $bool = $ws->kept_alive;
353
354Connection has been kept alive.
355
356=head2 local_address
357
358  my $address = $ws->local_address;
359
360Local interface address.
361
362=head2 local_port
363
364  my $port = $ws->local_port;
365
366Local interface port.
367
368=head2 parse_message
369
370  $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
371
372Parse WebSocket message.
373
374=head2 protocol
375
376  my $proto = $ws->protocol;
377
378Return negotiated subprotocol or C<undef>.
379
380=head2 remote_address
381
382  my $address = $ws->remote_address;
383
384Remote interface address.
385
386=head2 remote_port
387
388  my $port = $ws->remote_port;
389
390Remote interface port.
391
392=head2 req
393
394  my $req = $ws->req;
395
396Handshake request, usually a L<Mojo::Message::Request> object.
397
398=head2 res
399
400  my $res = $ws->res;
401
402Handshake response, usually a L<Mojo::Message::Response> object.
403
404=head2 resume
405
406  $ws = $ws->resume;
407
408Resume L</"handshake"> transaction.
409
410=head2 send
411
412  $ws = $ws->send({binary => $bytes});
413  $ws = $ws->send({text   => $bytes});
414  $ws = $ws->send({json   => {test => [1, 2, 3]}});
415  $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
416  $ws = $ws->send($chars);
417  $ws = $ws->send($chars => sub {...});
418
419Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
420written.
421
422  # Send "Ping" frame
423  use Mojo::WebSocket qw(WS_PING);
424  $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
425
426=head2 server_read
427
428  $ws->server_read($data);
429
430Read data server-side, used to implement web servers such as L<Mojo::Server::Daemon>.
431
432=head2 server_write
433
434  my $bytes = $ws->server_write;
435
436Write data server-side, used to implement web servers such as L<Mojo::Server::Daemon>.
437
438=head2 with_compression
439
440  $ws->with_compression;
441
442Negotiate C<permessage-deflate> extension for this WebSocket connection.
443
444=head2 with_protocols
445
446  my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
447
448Negotiate subprotocol for this WebSocket connection.
449
450=head1 SEE ALSO
451
452L<Mojolicious>, L<Mojolicious::Guides>, L<https://mojolicious.org>.
453
454=cut
455