1#+############################################################################## 2# # 3# File: Net/STOMP/Client/IO.pm # 4# # 5# Description: Input/Output support for Net::STOMP::Client # 6# # 7#-############################################################################## 8 9# 10# module definition 11# 12 13package Net::STOMP::Client::IO; 14use 5.005; # need the four-argument form of substr() 15use strict; 16use warnings; 17our $VERSION = "2.3"; 18our $REVISION = sprintf("%d.%02d", q$Revision: 2.2 $ =~ /(\d+)\.(\d+)/); 19 20# 21# used modules 22# 23 24use List::Util qw(min); 25use No::Worries::Die qw(dief); 26use No::Worries::Log qw(log_debug); 27use POSIX qw(:errno_h); 28use Time::HiRes qw(); 29 30# 31# constants 32# 33 34use constant READ_LENGTH => 32_768; # chunk size for sysread() 35use constant WRITE_LENGTH => 32_768; # minimum length for syswrite() 36 37#+++############################################################################ 38# # 39# private helpers # 40# # 41#---############################################################################ 42 43# 44# attempt to read data from the socket to the buffer 45# 46# note: we read at least once even if the buffer contains enough data 47# 48# common scenarios: 49# - timeout=undef minlen=undef: loop until we successfully read once 50# - timeout=undef minlen=N: loop until we read at least N bytes 51# - timeout=0 minlen=undef: read only once (successful or not) 52# - timeout=0 minlen=N: loop until we read >=N bytes or fail once 53# - timeout=T minlen=undef: loop until timeout 54# - timeout=T minlen=N: loop until we read >=N bytes or timeout 55# 56 57sub _try_to_read ($$$) { ## no critic 'ProhibitExcessComplexity' 58 my($self, $timeout, $minlen) = @_; 59 my($maxtime, $total, $count, $sleeptime, $remaining); 60 61 $self->{incoming_buflen} = length($self->{incoming_buffer}); 62 # boundary conditions 63 if ($timeout) { 64 return(0) unless $timeout > 0; 65 # timer starts now 66 $maxtime = Time::HiRes::time() + $timeout; 67 } 68 # try to read, in a loop, until we are done 69 $total = 0; 70 while (1) { 71 # attempt to read once 72 $count = sysread($self->{socket}, $self->{incoming_buffer}, 73 READ_LENGTH, $self->{incoming_buflen}); 74 if (defined($count)) { 75 # we could read this time 76 unless ($count) { 77 # ... but we hit the EOF 78 $self->{error} = "cannot sysread(): EOF"; 79 return($total); 80 } 81 # this is a normal successful read 82 $self->{incoming_time} = Time::HiRes::time(); 83 $self->{incoming_buflen} += $count; 84 $total += $count; 85 # check if we have worked enough 86 return($total) unless $minlen and $total < $minlen; 87 } else { 88 # we could not read this time 89 if ($! != EAGAIN and $! != EWOULDBLOCK) { 90 # unexpected error 91 $self->{error} = "cannot sysread(): $!"; 92 return(undef); 93 } 94 } 95 # check time 96 if (not defined($timeout)) { 97 # timeout = undef => loop forever until we are done 98 $sleeptime = 0.01; 99 } elsif ($timeout) { 100 # timeout > 0 => try again only if not too late 101 $remaining = $maxtime - Time::HiRes::time(); 102 return($total) unless $remaining > 0; 103 $sleeptime = min($remaining, 0.01); 104 } else { 105 # timeout = 0 => try again unless last read failed 106 return($total) unless $count; 107 } 108 # sleep a bit... 109 Time::HiRes::sleep($sleeptime) unless $count; 110 } 111} 112 113# 114# attempt to write data from the queue and buffer to the socket 115# 116# common scenarios: 117# - timeout=undef minlen=undef: loop until we successfully write once 118# - timeout=undef minlen=N: loop until we write at least N bytes 119# - timeout=0 minlen=undef: write only once (successful or not) 120# - timeout=0 minlen=N: loop until we write >=N bytes or fail once 121# - timeout=T minlen=undef: loop until timeout 122# - timeout=T minlen=N: loop until we write >=N bytes or timeout 123# 124 125sub _try_to_write ($$$) { ## no critic 'ProhibitExcessComplexity' 126 my($self, $timeout, $minlen) = @_; 127 my($maxtime, $total, $count, $sleeptime, $remaining, $data); 128 129 $self->{outgoing_buflen} = length($self->{outgoing_buffer}); 130 # boundary conditions 131 return(0) unless $self->{outgoing_buflen} or @{ $self->{outgoing_queue} }; 132 if ($timeout) { 133 return(0) unless $timeout > 0; 134 # timer starts now 135 $maxtime = Time::HiRes::time() + $timeout; 136 } 137 # try to write, in a loop, until we are done 138 $total = 0; 139 while (1) { 140 # make sure there is enough data in the outgoing buffer 141 while ($self->{outgoing_buflen} < WRITE_LENGTH 142 and @{ $self->{outgoing_queue} }) { 143 $data = shift(@{ $self->{outgoing_queue} }); 144 $self->{outgoing_buffer} .= ${ $data }; 145 $self->{outgoing_buflen} += length(${ $data }); 146 } 147 return($total) unless $self->{outgoing_buflen}; 148 # attempt to write once 149 $count = syswrite($self->{socket}, $self->{outgoing_buffer}, 150 $self->{outgoing_buflen}); 151 if (defined($count)) { 152 # we could write this time 153 if ($count) { 154 # this is a normal successful write 155 $self->{outgoing_time} = Time::HiRes::time(); 156 $self->{outgoing_buflen} -= $count; 157 $total += $count; 158 substr($self->{outgoing_buffer}, 0, $count, ""); 159 $self->{outgoing_length} -= $count; 160 # check if we have worked enough 161 return($total) unless $self->{outgoing_buflen} 162 or @{ $self->{outgoing_queue} }; 163 return($total) unless $minlen and $total < $minlen; 164 } 165 } else { 166 # we could not write this time 167 if ($! != EAGAIN and $! != EWOULDBLOCK) { 168 # unexpected error 169 $self->{error} = "cannot syswrite(): $!"; 170 return(undef); 171 } 172 } 173 # check time 174 if (not defined($timeout)) { 175 # timeout = undef => loop forever until we are done 176 $sleeptime = 0.01; 177 } elsif ($timeout) { 178 # timeout > 0 => try again only if not too late 179 $remaining = $maxtime - Time::HiRes::time(); 180 return($total) unless $remaining > 0; 181 $sleeptime = min($remaining, 0.01); 182 } else { 183 # timeout = 0 => try again unless last write failed 184 return($total) unless $count; 185 } 186 # sleep a bit... 187 Time::HiRes::sleep($sleeptime) unless $count; 188 } 189} 190 191#+++############################################################################ 192# # 193# object oriented interface # 194# # 195#---############################################################################ 196 197# 198# constructor 199# 200 201sub new : method { 202 my($class, $socket) = @_; 203 my($self); 204 205 dief("missing or invalid socket") 206 unless $socket and ref($socket) and $socket->isa("IO::Socket"); 207 $socket->blocking(0); 208 $self = {}; 209 $self->{socket} = $socket; 210 $self->{incoming_buffer} = ""; 211 $self->{incoming_buflen} = 0; 212 $self->{outgoing_buffer} = ""; 213 $self->{outgoing_buflen} = 0; # buffer length only 214 $self->{outgoing_queue} = []; 215 $self->{outgoing_length} = 0; # buffer + queue length 216 return(bless($self, $class)); 217} 218 219# 220# queue the given data (a scalar reference!) 221# 222 223sub queue_data : method { 224 my($self, $data) = @_; 225 my($length); 226 227 dief("unexpected data: %s", $data) unless ref($data) eq "SCALAR"; 228 $length = length(${ $data }); 229 if ($length) { 230 push(@{ $self->{outgoing_queue} }, $data); 231 $self->{outgoing_length} += $length; 232 } 233 return($self->{outgoing_length}); 234} 235 236# 237# send the queued data 238# 239 240sub send_data : method { 241 my($self, %option) = @_; 242 my($minlen, $count); 243 244 unless ($self->{error}) { 245 # send some data 246 $minlen = $self->{outgoing_length}; 247 $count = _try_to_write($self, $option{timeout}, $minlen); 248 } 249 dief($self->{error}) unless defined($count); 250 # so far so good 251 log_debug("sent %d bytes", $count) 252 if $option{debug} and $option{debug} =~ /\b(io|all)\b/; 253 return($count); 254} 255 256# 257# receive some data 258# 259 260sub receive_data : method { 261 my($self, %option) = @_; 262 my($minlen, $count); 263 264 unless ($self->{error}) { 265 # receive some data 266 $minlen = $option{timeout} ? 1 : undef; 267 $count = _try_to_read($self, $option{timeout}, $minlen); 268 } 269 dief($self->{error}) unless defined($count); 270 # so far so good 271 log_debug("received %d bytes", $count) 272 if $option{debug} and $option{debug} =~ /\b(io|all)\b/; 273 return($count); 274} 275 2761; 277 278__END__ 279 280=head1 NAME 281 282Net::STOMP::Client::IO - Input/Output support for Net::STOMP::Client 283 284=head1 DESCRIPTION 285 286This module provides Input/Output (I/O) support. It is used internally by 287L<Net::STOMP::Client> and should not be directly used elsewhere. 288 289It uses non-blocking I/O: the socket is in non-blocking mode and errors like 290C<EAGAIN> or C<EWOULDBLOCK> are trapped. 291 292=head1 FUNCTIONS 293 294This module provides the following internal methods: 295 296=over 297 298=item new(SOCKET) 299 300return a new Net::STOMP::Client::IO object (class method) 301 302=item queue_data(DATA) 303 304queue (append to the internal outgoing buffer) the given data (a binary 305string reference); return the length of DATA in bytes 306 307=item send_data([OPTIONS]) 308 309send some queued data to the socket; return the total number of bytes 310written 311 312=item receive_data([OPTIONS]) 313 314receive some data from the socket and put it in the internal incoming 315buffer; return the total number of bytes read 316 317=back 318 319=head1 SEE ALSO 320 321L<Net::STOMP::Client>. 322 323=head1 AUTHOR 324 325Lionel Cons L<http://cern.ch/lionel.cons> 326 327Copyright (C) CERN 2010-2017 328