1# $Id: Message.pm,v 1.14 2009/11/30 14:28:19 dk Exp $
2
3use strict;
4use warnings;
5
6package IO::Lambda::Message;
7
8our $CRLF  = "\x0a";
9our @EXPORT_OK = qw(message);
10our $DEBUG = $IO::Lambda::DEBUG{message} || 0;
11
12use Carp;
13use Exporter;
14use IO::Lambda qw(:all :dev);
15
16sub _d { "message(" . _o($_[0]) . ")" }
17
18sub new
19{
20	my ( $class, $r, $w, %opt ) = @_;
21
22	$opt{reader} ||= sysreader;
23	$opt{writer} ||= syswriter;
24	$opt{buf}    ||= '';
25	$opt{async}  ||= 0;
26
27	croak "Invalid read handle" unless $r;
28	$w = $r unless $w;
29
30	my $self = bless {
31		%opt,
32		r     => $r,
33		w     => $w,
34		queue => [],
35	}, $class;
36
37	warn "new ", _d($self) . "\n" if $DEBUG;
38
39	return $self;
40}
41
42# () :: (self, msg, deadline) -> error
43sub sender
44{
45	$_[0]->{sender} ||= lambda {
46		my ( $self, undef, $deadline) = @_;
47		my $msg = sprintf("%08x%s%s%s", length($_[1]), $CRLF, $_[1], $CRLF);
48		warn _d($self), "msg > [$msg]\n" if $DEBUG > 1;
49	context
50		writebuf($self-> {writer}), $self-> {w},
51		\ $msg, undef, 0, $deadline;
52	tail {
53		$_[1]
54	}}
55}
56
57# () :: (self, deadline) -> (msg, error)
58sub receiver
59{
60	$_[0]->{receiver} ||= lambda {
61		my ( $self, $deadline) = @_;
62	context
63		readbuf($self-> {reader}), $self-> {r}, \$self-> {buf}, 9,
64		$deadline;
65	tail {
66		my ( $size, $error) = @_;
67		return ( undef, $error) if defined $error;
68		$size = substr( $self-> {buf}, 0, 9, '');
69		return ( undef, "protocol error: chunk size not set")
70			unless $size =~ /^[a-f0-9]+$/i;
71
72		chop $size;
73		$size = length($CRLF) + hex $size;
74
75	context
76		readbuf($self-> {reader}), $self-> {r}, \$self-> {buf},
77		$size, $deadline;
78	tail {
79		my $error = $_[1];
80		return ( undef, $error) if defined $error;
81		my $msg = substr( $self-> {buf}, 0, $size, '');
82		chop $msg;
83		warn _d($self), "msg < [$msg]\n" if $DEBUG > 1;
84		return $msg;
85	}}}
86}
87
88# () :: (self, msg, deadline) -> (response, error)
89sub pusher
90{
91	$_[0]->{pusher} ||= lambda {
92		my ( $self, undef, $deadline) = @_;
93		context $self-> sender, $self, $_[1], $deadline;
94	tail {
95		my ( $result, $error) = @_;
96		return ( undef, $error) if defined $error;
97		context $self-> receiver, $self, $deadline;
98		&tail();
99	}}
100}
101
102# () :: (self, deadline) -> error
103sub incoming { die "abstract call" }
104sub puller
105{
106	$_[0]->{puller} ||= lambda {
107		my ( $self, $deadline) = @_;
108		context $self-> receiver, $self, $deadline;
109	tail {
110		my ( $msg, $error) = @_;
111		return ( undef, $error) if defined $error;
112		$msg = $self-> incoming( $msg);
113
114		context $self-> sender, $self, $msg, $deadline;
115		&tail();
116	}}
117}
118
119sub error
120{
121	return $_[0]-> {error} unless $#_;
122	$_[0]-> {error} = $_[1];
123}
124
125# lambda that sends all available messages in queue
126# () :: self -> error
127sub outcoming { $_[1] }
128sub queue_pusher
129{
130	$_[0]->{queue_pusher} ||= lambda {
131		my $self = shift;
132
133		warn _d($self) . ": sending msg ",
134			length($self-> {queue}-> [0]-> [2]), " bytes ",
135			_t($self-> {queue}-> [0]-> [3]),
136			"\n" if $DEBUG;
137		context $self-> pusher,
138			$self,
139			$self-> {queue}-> [0]-> [2],
140			$self-> {queue}-> [0]-> [3];
141	tail {
142		my ( $result, $error) = @_;
143		if ( defined $error) {
144			$self-> error($error);
145			warn _d($self) . " > error $error\n" if $DEBUG;
146			$self-> cancel_queue( undef, $error);
147			return $error;
148		}
149
150		# signal result to the outer lambda
151		my $q = shift @{$self-> {queue}};
152		unless ( $q) {
153			# cancel_queue was called?
154			return;
155		}
156
157		my ( $outer, $bind) = @$q;
158		$outer-> resolve( $bind);
159		$outer-> terminate( $self-> outcoming( $result));
160
161		# stop if it's all
162		unless ( @{$self-> {queue}}) {
163			warn _d($self) . ": push -> listen\n" if $DEBUG;
164			$self-> listen;
165			return;
166		}
167		$q = $self-> {queue}-> [0];
168
169		# fire up the next request
170		warn _d($self) . ": sending msg ",
171			length($q->[2]), " bytes ",
172			_t($q->[3]),
173			"\n" if $DEBUG;
174		context $self-> pusher, $self, $q->[2], $q->[3];
175		again;
176	}}
177}
178
179# () :: self -> error
180sub listener
181{
182	$_[0]->{listener} ||= lambda {
183		my $self = shift;
184		context $self-> puller, $self;
185	tail {
186		my ( $result, $error) = @_;
187		if ( defined $error) {
188			$self-> error($error);
189			warn _d($self) . " > error $error\n" if $DEBUG;
190			$self-> cancel_queue( undef, $error);
191			return $error;
192		}
193
194		# enough listening, now push
195		if ( @{$self-> {queue}}) {
196			warn _d($self) . ": listen -> push\n" if $DEBUG;
197			$self-> push;
198			return;
199		}
200
201		again;
202	}}
203}
204
205sub is_pushing   { $_[0]-> {queue_pusher} and $_[0]-> {queue_pusher}-> is_waiting }
206sub is_listening { $_[0]-> {listener}     and $_[0]-> {listener}->     is_waiting }
207
208sub push
209{
210	my ( $self) = @_;
211
212	croak "won't start, have errors: $self->{error}" if $self-> {error};
213	croak "won't start, already pushing"   if $self-> is_pushing;
214	croak "won't start, already listening" if $self-> is_listening;
215	warn _d($self) . ": start push\n" if $DEBUG;
216
217	my $q = $self-> queue_pusher;
218	$q-> reset;
219	$q-> call($self);
220	$q-> start;
221}
222
223sub listen
224{
225	my ( $self) = @_;
226
227	# need explicit consent
228	return unless $self-> {async};
229
230	croak "won't listen, have errors: $self->{error}" if $self-> {error};
231	croak "won't listen, already pushing"   if $self-> is_pushing;
232	croak "won't listen, already listening" if $self-> is_listening;
233	warn _d($self) . ": start listen\n" if $DEBUG;
234
235	my $q = $self-> listener;
236	$q-> reset;
237	$q-> call($self);
238	$q-> start;
239}
240
241# cancel all messages, store error on all of them
242sub cancel_queue
243{
244	my ( $self, @reason) = @_;
245	return unless $self-> {queue};
246	for my $q ( @{ $self-> {queue}}) {
247		my ( $outer, $bind) = @$q;
248		$outer-> resolve( $bind);
249		$outer-> terminate( @reason);
250	}
251	@{ $self-> {queue} } = ();
252}
253
254# (msg,deadline) :: () -> (result,error)
255sub new_message
256{
257	my ( $self, $msg, $deadline) = @_;
258
259	return lambda { $self-> error } if $self-> error;
260
261	warn _d($self) . " > msg ", _t($deadline), " ", length($msg), " bytes\n" if $DEBUG;
262
263	# won't end until we call resolve
264	my $outer = IO::Lambda-> new;
265	my $bind  = $outer-> bind;
266	CORE::push @{ $self-> {queue} }, [ $outer, $bind, $msg, $deadline ];
267
268	$self-> push if 1 == @{$self-> {queue}} and not $self-> is_listening;
269
270	return $outer;
271}
272
273sub message(&) { new_message(context)-> condition( shift, \&message, 'message') }
274
275package IO::Lambda::Message::Simple;
276
277my $debug = $IO::Lambda::DEBUG{message} || 0;
278
279sub _d { "simple_msg($_[0])" }
280
281sub new
282{
283	my ( $class, $r, $w) = @_;
284	$w = $r unless $w;
285	my $self = bless {
286		r => $r,
287		w => $w,
288	}, $class;
289	warn "new ", _d($self) . "\n" if $debug;
290	return $self;
291}
292
293sub read
294{
295	my $self = $_[0];
296
297	my $size = readline($self-> {r});
298	die "bad size" unless defined($size) and $size =~ /^[0-9a-f]+\n$/i;
299	chop $size;
300	$size = 1 + hex $size;
301
302	my $buf = '';
303	while ( $size > 0) {
304		my $b = readline($self-> {r});
305		die "can't read from socket: $!"
306			unless defined $b;
307		$size -= length($b);
308		$buf .= $b;
309	}
310
311	chop $buf;
312
313	warn _d($self) . ": ", length($buf), " bytes read\n" if $debug > 1;
314
315	return $buf;
316}
317
318sub write
319{
320	my ( $self, $msg) = @_;
321	printf( { $self-> {w} } "%08x\x0a%s\x0a", length($msg), $msg)
322		or die "can't write to socket: $!";
323	warn _d($self) . ": ", length($msg), " bytes written\n" if $debug > 1;
324}
325
326sub quit { $_[0]-> {run} = 0 }
327
328sub run
329{
330	my $self = $_[0];
331
332	$self-> {run} = 1;
333	$self-> {w}-> autoflush(1);
334
335	while ( $self-> {run} ) {
336		my ( $msg, $error) = $self-> read;
337		die "bad message: $error" if defined $error;
338		( $msg, $error) = $self-> decode( $msg);
339
340		my $response;
341		if ( defined $error) {
342			$response = [0, "bad message: $error"];
343			warn _d($self) . ": bad message: $error\n" if $debug;
344			goto SEND;
345		}
346		unless ( $msg and ref($msg) and ref($msg) eq 'ARRAY' and @$msg > 0) {
347			$response = [0, "bad message"];
348			warn _d($self) . ": bad message\n" if $debug;
349			goto SEND;
350		}
351
352		my $method = shift @$msg;
353
354		if ( $self-> can($method)) {
355			my $wantarray = shift @$msg;
356			my @r;
357			eval {
358				if ( $wantarray) {
359					@r    = $self-> $method(@$msg);
360				} else {
361					$r[0] = $self-> $method(@$msg);
362				}
363			};
364			if ( $@) {
365				warn _d($self) . ": $method / died $@\n" if $debug;
366				$response = [0, $@];
367				$self-> quit;
368			} else {
369				warn _d($self) . ": $method / ok\n" if $debug;
370				$response = [1, @r];
371			}
372		} else {
373			warn _d($self) . ": no such method: $method\n" if $debug;
374			$response = [0, 'no such method'];
375		};
376	SEND:
377		( $msg, $error) = $self-> encode($response);
378		if ( defined $error) {
379			warn _d($self) . ": encode error $error\n" if $debug;
380			( $msg, $error) = $self-> encode([0, $error]);
381			die $error if $error;
382		}
383		$self-> write($msg);
384	}
385
386	warn _d($self) . " quit\n" if $debug;
387}
388
3891;
390
391__DATA__
392
393=pod
394
395=head1 NAME
396
397IO::Lambda::Message - message passing queue
398
399=head1 DESCRIPTION
400
401The module implements a generic message passing protocol, and two generic
402classes that implement the server and the client functionality. The server code
403is implemented in a simple, blocking fashion, and is expected to be executed
404remotely. The client API is written in lambda style, where message completion
405can be asynchronously awaited for. The communication between server and client
406is done through two file handles of any type ( stream sockets, pipes, etc ).
407
408=head1 SYNOPSIS
409
410    use IO::Lambda::Message qw(message);
411
412    lambda {
413       my $messenger = IO::Lambda::Message-> new( \*READER, \*WRITER);
414       context $messenger-> new_message('hello world');
415    tail {
416       print "response1: @_, "\n";
417       context $messenger, 'same thing';
418    message {
419       print "response2: @_, "\n";
420       undef $messenger;
421    }}}
422
423=head1 Message protocol
424
425The message passing protocol featured here is synchronous, which means that any
426message initiated either by server or client is expected to be replied to.
427Both server and client can wait for the message reply, but they cannot
428communicate while waiting.
429
430Messages are prepended with simple header, that is a 8-digit hexadecimal length
431of the message, and 1 byte with value 0x0A (newline).  After the message
432another 0x0A byte is followed.
433
434=head1 IO::Lambda::Message
435
436The class implements a generic message passing queue, that allows adding
437asynchronous messages to the queue, and wait for the response.
438
439=over
440
441=item new $class, $reader, $writer, %options
442
443Constructs a new object of C<IO::Lambda::Message> class, and attaches to
444C<$reader> and C<$writer> file handles ( which can be the same object, and in
445which case C<$writer> can be omitted, but only if C<%options> is empty too).
446Accepted options:
447
448=over
449
450=item reader :: ($fh, $buf, $cond, $deadline) -> ioresult
451
452Custom reader, C<sysreader> by default.
453
454=item writer :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
455
456Custom writer, C<syswriter> by default.
457
458=item buf :: string
459
460If C<$reader> handle was used (or will be needed to be used) in buffered I/O,
461its buffer can be passed along to the object.
462
463=item async :: boolean
464
465If set, the object will listen for incoming messages from the server, otherwise
466it will only initiate outcoming messages. By default set to 0, and the method
467C<incoming> that handles incoming messages, dies. This functionality is
468designed for derived classes, not for the caller.
469
470=back
471
472=item new_message($message, $deadline = undef) :: () -> ($response, $error)
473
474Registers a new message in the queue. The message must be delivered and replied
475to no later than C<$deadline>, and returns a lambda that will be ready when the
476message is responded to. The lambda returns the response or the error.
477
478Upon communication error, all queued messages are discarded.  Timeout is regarded
479as a protocol error too, so use the C<$deadline> option with care.
480
481=item message ($message, $deadline = undef) :: () -> ($response, $error)
482
483Condition version of C<new_message>.
484
485=item cancel_queue(@reason)
486
487Cancels all pending messages, stores C<@reason> in the associated lambdas.
488
489=item error
490
491Returns the last protocol handling error. If set, no new messages are allowed
492to be registered, and listening will fail too.
493
494=item is_listening
495
496If set, object is listening for asynchronous events from server.
497
498=item is_pushing
499
500If set, object is sending messages to the server.
501
502=back
503
504=head1 IO::Lambda::Message::Simple
505
506The class implements a simple generic protocol dispatcher, that
507executes methods of its own class, and returns the results back
508to the client. The methods have to be defined in a derived class.
509
510=over
511
512=item new $reader [$writer = $reader]
513
514Creates a new object that will communicate with clients using
515given handles, in a blocking fashion.
516
517=item run
518
519Starts the message loop
520
521=item quit
522
523Signals the loop to stop
524
525=back
526
527=head1 SEE ALSO
528
529L<IO::Lambda::DBI>.
530
531=head1 AUTHOR
532
533Dmitry Karasik, E<lt>dmitry@karasik.eu.orgE<gt>.
534
535=cut
536