1#+##############################################################################
2#                                                                              #
3# File: Net/STOMP/Client/IO.pm                                                 #
4#                                                                              #
5# Description: Input/Output support for Net::STOMP::Client                     #
6#                                                                              #
7#-##############################################################################
8
9#
10# module definition
11#
12
13package Net::STOMP::Client::IO;
14use 5.005; # need the four-argument form of substr()
15use strict;
16use warnings;
17our $VERSION  = "2.3";
18our $REVISION = sprintf("%d.%02d", q$Revision: 2.2 $ =~ /(\d+)\.(\d+)/);
19
20#
21# used modules
22#
23
24use List::Util qw(min);
25use No::Worries::Die qw(dief);
26use No::Worries::Log qw(log_debug);
27use POSIX qw(:errno_h);
28use Time::HiRes qw();
29
30#
31# constants
32#
33
34use constant READ_LENGTH  => 32_768;  # chunk size for sysread()
35use constant WRITE_LENGTH => 32_768;  # minimum length for syswrite()
36
37#+++############################################################################
38#                                                                              #
39# private helpers                                                              #
40#                                                                              #
41#---############################################################################
42
43#
44# attempt to read data from the socket to the buffer
45#
46# note: we read at least once even if the buffer contains enough data
47#
48# common scenarios:
49#  - timeout=undef minlen=undef: loop until we successfully read once
50#  - timeout=undef minlen=N: loop until we read at least N bytes
51#  - timeout=0     minlen=undef: read only once (successful or not)
52#  - timeout=0     minlen=N: loop until we read >=N bytes or fail once
53#  - timeout=T     minlen=undef: loop until timeout
54#  - timeout=T     minlen=N: loop until we read >=N bytes or timeout
55#
56
57sub _try_to_read ($$$) {  ## no critic 'ProhibitExcessComplexity'
58    my($self, $timeout, $minlen) = @_;
59    my($maxtime, $total, $count, $sleeptime, $remaining);
60
61    $self->{incoming_buflen} = length($self->{incoming_buffer});
62    # boundary conditions
63    if ($timeout) {
64        return(0) unless $timeout > 0;
65        # timer starts now
66        $maxtime = Time::HiRes::time() + $timeout;
67    }
68    # try to read, in a loop, until we are done
69    $total = 0;
70    while (1) {
71        # attempt to read once
72        $count = sysread($self->{socket}, $self->{incoming_buffer},
73                         READ_LENGTH, $self->{incoming_buflen});
74        if (defined($count)) {
75            # we could read this time
76            unless ($count) {
77                # ... but we hit the EOF
78                $self->{error} = "cannot sysread(): EOF";
79                return($total);
80            }
81            # this is a normal successful read
82            $self->{incoming_time} = Time::HiRes::time();
83            $self->{incoming_buflen} += $count;
84            $total += $count;
85            # check if we have worked enough
86            return($total) unless $minlen and $total < $minlen;
87        } else {
88            # we could not read this time
89            if ($! != EAGAIN and $! != EWOULDBLOCK) {
90                # unexpected error
91                $self->{error} = "cannot sysread(): $!";
92                return(undef);
93            }
94        }
95        # check time
96        if (not defined($timeout)) {
97            # timeout = undef => loop forever until we are done
98            $sleeptime = 0.01;
99        } elsif ($timeout) {
100            # timeout > 0 => try again only if not too late
101            $remaining = $maxtime - Time::HiRes::time();
102            return($total) unless $remaining > 0;
103            $sleeptime = min($remaining, 0.01);
104        } else {
105            # timeout = 0 => try again unless last read failed
106            return($total) unless $count;
107        }
108        # sleep a bit...
109        Time::HiRes::sleep($sleeptime) unless $count;
110    }
111}
112
113#
114# attempt to write data from the queue and buffer to the socket
115#
116# common scenarios:
117#  - timeout=undef minlen=undef: loop until we successfully write once
118#  - timeout=undef minlen=N: loop until we write at least N bytes
119#  - timeout=0     minlen=undef: write only once (successful or not)
120#  - timeout=0     minlen=N: loop until we write >=N bytes or fail once
121#  - timeout=T     minlen=undef: loop until timeout
122#  - timeout=T     minlen=N: loop until we write >=N bytes or timeout
123#
124
125sub _try_to_write ($$$) {  ## no critic 'ProhibitExcessComplexity'
126    my($self, $timeout, $minlen) = @_;
127    my($maxtime, $total, $count, $sleeptime, $remaining, $data);
128
129    $self->{outgoing_buflen} = length($self->{outgoing_buffer});
130    # boundary conditions
131    return(0) unless $self->{outgoing_buflen} or @{ $self->{outgoing_queue} };
132    if ($timeout) {
133        return(0) unless $timeout > 0;
134        # timer starts now
135        $maxtime = Time::HiRes::time() + $timeout;
136    }
137    # try to write, in a loop, until we are done
138    $total = 0;
139    while (1) {
140        # make sure there is enough data in the outgoing buffer
141        while ($self->{outgoing_buflen} < WRITE_LENGTH
142               and @{ $self->{outgoing_queue} }) {
143            $data = shift(@{ $self->{outgoing_queue} });
144            $self->{outgoing_buffer} .= ${ $data };
145            $self->{outgoing_buflen} += length(${ $data });
146        }
147        return($total) unless $self->{outgoing_buflen};
148        # attempt to write once
149        $count = syswrite($self->{socket}, $self->{outgoing_buffer},
150                          $self->{outgoing_buflen});
151        if (defined($count)) {
152            # we could write this time
153            if ($count) {
154                # this is a normal successful write
155                $self->{outgoing_time} = Time::HiRes::time();
156                $self->{outgoing_buflen} -= $count;
157                $total += $count;
158                substr($self->{outgoing_buffer}, 0, $count, "");
159                $self->{outgoing_length} -= $count;
160                # check if we have worked enough
161                return($total) unless $self->{outgoing_buflen}
162                                or @{ $self->{outgoing_queue} };
163                return($total) unless $minlen and $total < $minlen;
164            }
165        } else {
166            # we could not write this time
167            if ($! != EAGAIN and $! != EWOULDBLOCK) {
168                # unexpected error
169                $self->{error} = "cannot syswrite(): $!";
170                return(undef);
171            }
172        }
173        # check time
174        if (not defined($timeout)) {
175            # timeout = undef => loop forever until we are done
176            $sleeptime = 0.01;
177        } elsif ($timeout) {
178            # timeout > 0 => try again only if not too late
179            $remaining = $maxtime - Time::HiRes::time();
180            return($total) unless $remaining > 0;
181            $sleeptime = min($remaining, 0.01);
182        } else {
183            # timeout = 0 => try again unless last write failed
184            return($total) unless $count;
185        }
186        # sleep a bit...
187        Time::HiRes::sleep($sleeptime) unless $count;
188    }
189}
190
191#+++############################################################################
192#                                                                              #
193# object oriented interface                                                    #
194#                                                                              #
195#---############################################################################
196
197#
198# constructor
199#
200
201sub new : method {
202    my($class, $socket) = @_;
203    my($self);
204
205    dief("missing or invalid socket")
206        unless $socket and ref($socket) and $socket->isa("IO::Socket");
207    $socket->blocking(0);
208    $self = {};
209    $self->{socket} = $socket;
210    $self->{incoming_buffer} = "";
211    $self->{incoming_buflen} = 0;
212    $self->{outgoing_buffer} = "";
213    $self->{outgoing_buflen} = 0; # buffer length only
214    $self->{outgoing_queue} = [];
215    $self->{outgoing_length} = 0; # buffer + queue length
216    return(bless($self, $class));
217}
218
219#
220# queue the given data (a scalar reference!)
221#
222
223sub queue_data : method {
224    my($self, $data) = @_;
225    my($length);
226
227    dief("unexpected data: %s", $data) unless ref($data) eq "SCALAR";
228    $length = length(${ $data });
229    if ($length) {
230        push(@{ $self->{outgoing_queue} }, $data);
231        $self->{outgoing_length} += $length;
232    }
233    return($self->{outgoing_length});
234}
235
236#
237# send the queued data
238#
239
240sub send_data : method {
241    my($self, %option) = @_;
242    my($minlen, $count);
243
244    unless ($self->{error}) {
245        # send some data
246        $minlen = $self->{outgoing_length};
247        $count = _try_to_write($self, $option{timeout}, $minlen);
248    }
249    dief($self->{error}) unless defined($count);
250    # so far so good
251    log_debug("sent %d bytes", $count)
252        if $option{debug} and $option{debug} =~ /\b(io|all)\b/;
253    return($count);
254}
255
256#
257# receive some data
258#
259
260sub receive_data : method {
261    my($self, %option) = @_;
262    my($minlen, $count);
263
264    unless ($self->{error}) {
265        # receive some data
266        $minlen = $option{timeout} ? 1 : undef;
267        $count = _try_to_read($self, $option{timeout}, $minlen);
268    }
269    dief($self->{error}) unless defined($count);
270    # so far so good
271    log_debug("received %d bytes", $count)
272        if $option{debug} and $option{debug} =~ /\b(io|all)\b/;
273    return($count);
274}
275
2761;
277
278__END__
279
280=head1 NAME
281
282Net::STOMP::Client::IO - Input/Output support for Net::STOMP::Client
283
284=head1 DESCRIPTION
285
286This module provides Input/Output (I/O) support. It is used internally by
287L<Net::STOMP::Client> and should not be directly used elsewhere.
288
289It uses non-blocking I/O: the socket is in non-blocking mode and errors like
290C<EAGAIN> or C<EWOULDBLOCK> are trapped.
291
292=head1 FUNCTIONS
293
294This module provides the following internal methods:
295
296=over
297
298=item new(SOCKET)
299
300return a new Net::STOMP::Client::IO object (class method)
301
302=item queue_data(DATA)
303
304queue (append to the internal outgoing buffer) the given data (a binary
305string reference); return the length of DATA in bytes
306
307=item send_data([OPTIONS])
308
309send some queued data to the socket; return the total number of bytes
310written
311
312=item receive_data([OPTIONS])
313
314receive some data from the socket and put it in the internal incoming
315buffer; return the total number of bytes read
316
317=back
318
319=head1 SEE ALSO
320
321L<Net::STOMP::Client>.
322
323=head1 AUTHOR
324
325Lionel Cons L<http://cern.ch/lionel.cons>
326
327Copyright (C) CERN 2010-2017
328