1# $Id: Message.pm,v 1.14 2009/11/30 14:28:19 dk Exp $ 2 3use strict; 4use warnings; 5 6package IO::Lambda::Message; 7 8our $CRLF = "\x0a"; 9our @EXPORT_OK = qw(message); 10our $DEBUG = $IO::Lambda::DEBUG{message} || 0; 11 12use Carp; 13use Exporter; 14use IO::Lambda qw(:all :dev); 15 16sub _d { "message(" . _o($_[0]) . ")" } 17 18sub new 19{ 20 my ( $class, $r, $w, %opt ) = @_; 21 22 $opt{reader} ||= sysreader; 23 $opt{writer} ||= syswriter; 24 $opt{buf} ||= ''; 25 $opt{async} ||= 0; 26 27 croak "Invalid read handle" unless $r; 28 $w = $r unless $w; 29 30 my $self = bless { 31 %opt, 32 r => $r, 33 w => $w, 34 queue => [], 35 }, $class; 36 37 warn "new ", _d($self) . "\n" if $DEBUG; 38 39 return $self; 40} 41 42# () :: (self, msg, deadline) -> error 43sub sender 44{ 45 $_[0]->{sender} ||= lambda { 46 my ( $self, undef, $deadline) = @_; 47 my $msg = sprintf("%08x%s%s%s", length($_[1]), $CRLF, $_[1], $CRLF); 48 warn _d($self), "msg > [$msg]\n" if $DEBUG > 1; 49 context 50 writebuf($self-> {writer}), $self-> {w}, 51 \ $msg, undef, 0, $deadline; 52 tail { 53 $_[1] 54 }} 55} 56 57# () :: (self, deadline) -> (msg, error) 58sub receiver 59{ 60 $_[0]->{receiver} ||= lambda { 61 my ( $self, $deadline) = @_; 62 context 63 readbuf($self-> {reader}), $self-> {r}, \$self-> {buf}, 9, 64 $deadline; 65 tail { 66 my ( $size, $error) = @_; 67 return ( undef, $error) if defined $error; 68 $size = substr( $self-> {buf}, 0, 9, ''); 69 return ( undef, "protocol error: chunk size not set") 70 unless $size =~ /^[a-f0-9]+$/i; 71 72 chop $size; 73 $size = length($CRLF) + hex $size; 74 75 context 76 readbuf($self-> {reader}), $self-> {r}, \$self-> {buf}, 77 $size, $deadline; 78 tail { 79 my $error = $_[1]; 80 return ( undef, $error) if defined $error; 81 my $msg = substr( $self-> {buf}, 0, $size, ''); 82 chop $msg; 83 warn _d($self), "msg < [$msg]\n" if $DEBUG > 1; 84 return $msg; 85 }}} 86} 87 88# () :: (self, msg, deadline) -> (response, error) 89sub pusher 90{ 91 $_[0]->{pusher} ||= lambda { 92 my ( $self, undef, $deadline) = @_; 93 context $self-> sender, $self, $_[1], $deadline; 94 tail { 95 my ( $result, $error) = @_; 96 return ( undef, $error) if defined $error; 97 context $self-> receiver, $self, $deadline; 98 &tail(); 99 }} 100} 101 102# () :: (self, deadline) -> error 103sub incoming { die "abstract call" } 104sub puller 105{ 106 $_[0]->{puller} ||= lambda { 107 my ( $self, $deadline) = @_; 108 context $self-> receiver, $self, $deadline; 109 tail { 110 my ( $msg, $error) = @_; 111 return ( undef, $error) if defined $error; 112 $msg = $self-> incoming( $msg); 113 114 context $self-> sender, $self, $msg, $deadline; 115 &tail(); 116 }} 117} 118 119sub error 120{ 121 return $_[0]-> {error} unless $#_; 122 $_[0]-> {error} = $_[1]; 123} 124 125# lambda that sends all available messages in queue 126# () :: self -> error 127sub outcoming { $_[1] } 128sub queue_pusher 129{ 130 $_[0]->{queue_pusher} ||= lambda { 131 my $self = shift; 132 133 warn _d($self) . ": sending msg ", 134 length($self-> {queue}-> [0]-> [2]), " bytes ", 135 _t($self-> {queue}-> [0]-> [3]), 136 "\n" if $DEBUG; 137 context $self-> pusher, 138 $self, 139 $self-> {queue}-> [0]-> [2], 140 $self-> {queue}-> [0]-> [3]; 141 tail { 142 my ( $result, $error) = @_; 143 if ( defined $error) { 144 $self-> error($error); 145 warn _d($self) . " > error $error\n" if $DEBUG; 146 $self-> cancel_queue( undef, $error); 147 return $error; 148 } 149 150 # signal result to the outer lambda 151 my $q = shift @{$self-> {queue}}; 152 unless ( $q) { 153 # cancel_queue was called? 154 return; 155 } 156 157 my ( $outer, $bind) = @$q; 158 $outer-> resolve( $bind); 159 $outer-> terminate( $self-> outcoming( $result)); 160 161 # stop if it's all 162 unless ( @{$self-> {queue}}) { 163 warn _d($self) . ": push -> listen\n" if $DEBUG; 164 $self-> listen; 165 return; 166 } 167 $q = $self-> {queue}-> [0]; 168 169 # fire up the next request 170 warn _d($self) . ": sending msg ", 171 length($q->[2]), " bytes ", 172 _t($q->[3]), 173 "\n" if $DEBUG; 174 context $self-> pusher, $self, $q->[2], $q->[3]; 175 again; 176 }} 177} 178 179# () :: self -> error 180sub listener 181{ 182 $_[0]->{listener} ||= lambda { 183 my $self = shift; 184 context $self-> puller, $self; 185 tail { 186 my ( $result, $error) = @_; 187 if ( defined $error) { 188 $self-> error($error); 189 warn _d($self) . " > error $error\n" if $DEBUG; 190 $self-> cancel_queue( undef, $error); 191 return $error; 192 } 193 194 # enough listening, now push 195 if ( @{$self-> {queue}}) { 196 warn _d($self) . ": listen -> push\n" if $DEBUG; 197 $self-> push; 198 return; 199 } 200 201 again; 202 }} 203} 204 205sub is_pushing { $_[0]-> {queue_pusher} and $_[0]-> {queue_pusher}-> is_waiting } 206sub is_listening { $_[0]-> {listener} and $_[0]-> {listener}-> is_waiting } 207 208sub push 209{ 210 my ( $self) = @_; 211 212 croak "won't start, have errors: $self->{error}" if $self-> {error}; 213 croak "won't start, already pushing" if $self-> is_pushing; 214 croak "won't start, already listening" if $self-> is_listening; 215 warn _d($self) . ": start push\n" if $DEBUG; 216 217 my $q = $self-> queue_pusher; 218 $q-> reset; 219 $q-> call($self); 220 $q-> start; 221} 222 223sub listen 224{ 225 my ( $self) = @_; 226 227 # need explicit consent 228 return unless $self-> {async}; 229 230 croak "won't listen, have errors: $self->{error}" if $self-> {error}; 231 croak "won't listen, already pushing" if $self-> is_pushing; 232 croak "won't listen, already listening" if $self-> is_listening; 233 warn _d($self) . ": start listen\n" if $DEBUG; 234 235 my $q = $self-> listener; 236 $q-> reset; 237 $q-> call($self); 238 $q-> start; 239} 240 241# cancel all messages, store error on all of them 242sub cancel_queue 243{ 244 my ( $self, @reason) = @_; 245 return unless $self-> {queue}; 246 for my $q ( @{ $self-> {queue}}) { 247 my ( $outer, $bind) = @$q; 248 $outer-> resolve( $bind); 249 $outer-> terminate( @reason); 250 } 251 @{ $self-> {queue} } = (); 252} 253 254# (msg,deadline) :: () -> (result,error) 255sub new_message 256{ 257 my ( $self, $msg, $deadline) = @_; 258 259 return lambda { $self-> error } if $self-> error; 260 261 warn _d($self) . " > msg ", _t($deadline), " ", length($msg), " bytes\n" if $DEBUG; 262 263 # won't end until we call resolve 264 my $outer = IO::Lambda-> new; 265 my $bind = $outer-> bind; 266 CORE::push @{ $self-> {queue} }, [ $outer, $bind, $msg, $deadline ]; 267 268 $self-> push if 1 == @{$self-> {queue}} and not $self-> is_listening; 269 270 return $outer; 271} 272 273sub message(&) { new_message(context)-> condition( shift, \&message, 'message') } 274 275package IO::Lambda::Message::Simple; 276 277my $debug = $IO::Lambda::DEBUG{message} || 0; 278 279sub _d { "simple_msg($_[0])" } 280 281sub new 282{ 283 my ( $class, $r, $w) = @_; 284 $w = $r unless $w; 285 my $self = bless { 286 r => $r, 287 w => $w, 288 }, $class; 289 warn "new ", _d($self) . "\n" if $debug; 290 return $self; 291} 292 293sub read 294{ 295 my $self = $_[0]; 296 297 my $size = readline($self-> {r}); 298 die "bad size" unless defined($size) and $size =~ /^[0-9a-f]+\n$/i; 299 chop $size; 300 $size = 1 + hex $size; 301 302 my $buf = ''; 303 while ( $size > 0) { 304 my $b = readline($self-> {r}); 305 die "can't read from socket: $!" 306 unless defined $b; 307 $size -= length($b); 308 $buf .= $b; 309 } 310 311 chop $buf; 312 313 warn _d($self) . ": ", length($buf), " bytes read\n" if $debug > 1; 314 315 return $buf; 316} 317 318sub write 319{ 320 my ( $self, $msg) = @_; 321 printf( { $self-> {w} } "%08x\x0a%s\x0a", length($msg), $msg) 322 or die "can't write to socket: $!"; 323 warn _d($self) . ": ", length($msg), " bytes written\n" if $debug > 1; 324} 325 326sub quit { $_[0]-> {run} = 0 } 327 328sub run 329{ 330 my $self = $_[0]; 331 332 $self-> {run} = 1; 333 $self-> {w}-> autoflush(1); 334 335 while ( $self-> {run} ) { 336 my ( $msg, $error) = $self-> read; 337 die "bad message: $error" if defined $error; 338 ( $msg, $error) = $self-> decode( $msg); 339 340 my $response; 341 if ( defined $error) { 342 $response = [0, "bad message: $error"]; 343 warn _d($self) . ": bad message: $error\n" if $debug; 344 goto SEND; 345 } 346 unless ( $msg and ref($msg) and ref($msg) eq 'ARRAY' and @$msg > 0) { 347 $response = [0, "bad message"]; 348 warn _d($self) . ": bad message\n" if $debug; 349 goto SEND; 350 } 351 352 my $method = shift @$msg; 353 354 if ( $self-> can($method)) { 355 my $wantarray = shift @$msg; 356 my @r; 357 eval { 358 if ( $wantarray) { 359 @r = $self-> $method(@$msg); 360 } else { 361 $r[0] = $self-> $method(@$msg); 362 } 363 }; 364 if ( $@) { 365 warn _d($self) . ": $method / died $@\n" if $debug; 366 $response = [0, $@]; 367 $self-> quit; 368 } else { 369 warn _d($self) . ": $method / ok\n" if $debug; 370 $response = [1, @r]; 371 } 372 } else { 373 warn _d($self) . ": no such method: $method\n" if $debug; 374 $response = [0, 'no such method']; 375 }; 376 SEND: 377 ( $msg, $error) = $self-> encode($response); 378 if ( defined $error) { 379 warn _d($self) . ": encode error $error\n" if $debug; 380 ( $msg, $error) = $self-> encode([0, $error]); 381 die $error if $error; 382 } 383 $self-> write($msg); 384 } 385 386 warn _d($self) . " quit\n" if $debug; 387} 388 3891; 390 391__DATA__ 392 393=pod 394 395=head1 NAME 396 397IO::Lambda::Message - message passing queue 398 399=head1 DESCRIPTION 400 401The module implements a generic message passing protocol, and two generic 402classes that implement the server and the client functionality. The server code 403is implemented in a simple, blocking fashion, and is expected to be executed 404remotely. The client API is written in lambda style, where message completion 405can be asynchronously awaited for. The communication between server and client 406is done through two file handles of any type ( stream sockets, pipes, etc ). 407 408=head1 SYNOPSIS 409 410 use IO::Lambda::Message qw(message); 411 412 lambda { 413 my $messenger = IO::Lambda::Message-> new( \*READER, \*WRITER); 414 context $messenger-> new_message('hello world'); 415 tail { 416 print "response1: @_, "\n"; 417 context $messenger, 'same thing'; 418 message { 419 print "response2: @_, "\n"; 420 undef $messenger; 421 }}} 422 423=head1 Message protocol 424 425The message passing protocol featured here is synchronous, which means that any 426message initiated either by server or client is expected to be replied to. 427Both server and client can wait for the message reply, but they cannot 428communicate while waiting. 429 430Messages are prepended with simple header, that is a 8-digit hexadecimal length 431of the message, and 1 byte with value 0x0A (newline). After the message 432another 0x0A byte is followed. 433 434=head1 IO::Lambda::Message 435 436The class implements a generic message passing queue, that allows adding 437asynchronous messages to the queue, and wait for the response. 438 439=over 440 441=item new $class, $reader, $writer, %options 442 443Constructs a new object of C<IO::Lambda::Message> class, and attaches to 444C<$reader> and C<$writer> file handles ( which can be the same object, and in 445which case C<$writer> can be omitted, but only if C<%options> is empty too). 446Accepted options: 447 448=over 449 450=item reader :: ($fh, $buf, $cond, $deadline) -> ioresult 451 452Custom reader, C<sysreader> by default. 453 454=item writer :: ($fh, $buf, $length, $offset, $deadline) -> ioresult 455 456Custom writer, C<syswriter> by default. 457 458=item buf :: string 459 460If C<$reader> handle was used (or will be needed to be used) in buffered I/O, 461its buffer can be passed along to the object. 462 463=item async :: boolean 464 465If set, the object will listen for incoming messages from the server, otherwise 466it will only initiate outcoming messages. By default set to 0, and the method 467C<incoming> that handles incoming messages, dies. This functionality is 468designed for derived classes, not for the caller. 469 470=back 471 472=item new_message($message, $deadline = undef) :: () -> ($response, $error) 473 474Registers a new message in the queue. The message must be delivered and replied 475to no later than C<$deadline>, and returns a lambda that will be ready when the 476message is responded to. The lambda returns the response or the error. 477 478Upon communication error, all queued messages are discarded. Timeout is regarded 479as a protocol error too, so use the C<$deadline> option with care. 480 481=item message ($message, $deadline = undef) :: () -> ($response, $error) 482 483Condition version of C<new_message>. 484 485=item cancel_queue(@reason) 486 487Cancels all pending messages, stores C<@reason> in the associated lambdas. 488 489=item error 490 491Returns the last protocol handling error. If set, no new messages are allowed 492to be registered, and listening will fail too. 493 494=item is_listening 495 496If set, object is listening for asynchronous events from server. 497 498=item is_pushing 499 500If set, object is sending messages to the server. 501 502=back 503 504=head1 IO::Lambda::Message::Simple 505 506The class implements a simple generic protocol dispatcher, that 507executes methods of its own class, and returns the results back 508to the client. The methods have to be defined in a derived class. 509 510=over 511 512=item new $reader [$writer = $reader] 513 514Creates a new object that will communicate with clients using 515given handles, in a blocking fashion. 516 517=item run 518 519Starts the message loop 520 521=item quit 522 523Signals the loop to stop 524 525=back 526 527=head1 SEE ALSO 528 529L<IO::Lambda::DBI>. 530 531=head1 AUTHOR 532 533Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>. 534 535=cut 536