1# 2# Licensed to the Apache Software Foundation (ASF) under one 3# or more contributor license agreements. See the NOTICE file 4# distributed with this work for additional information 5# regarding copyright ownership. The ASF licenses this file 6# to you under the Apache License, Version 2.0 (the 7# "License"); you may not use this file except in compliance 8# with the License. You may obtain a copy of the License at 9# 10# http://www.apache.org/licenses/LICENSE-2.0 11# 12# Unless required by applicable law or agreed to in writing, 13# software distributed under the License is distributed on an 14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 15# KIND, either express or implied. See the License for the 16# specific language governing permissions and limitations 17# under the License. 18# 19 20use 5.10.0; 21use strict; 22use warnings; 23 24use Thrift; 25use Thrift::Exception; 26use Thrift::Transport; 27 28use IO::Socket::INET; 29use IO::Select; 30 31package Thrift::Socket; 32use base qw( Thrift::Transport ); 33use version 0.77; our $VERSION = version->declare("$Thrift::VERSION"); 34 35# 36# Construction and usage 37# 38# my $opts = {} 39# my $socket = Thrift::Socket->new(\%opts); 40# 41# options: 42# 43# host => host to connect to 44# port => port to connect to 45# sendTimeout => timeout used for send and for connect 46# recvTimeout => timeout used for recv 47# 48 49sub new 50{ 51 my $classname = shift; 52 my $opts = shift; 53 54 # default settings: 55 my $self = { 56 host => 'localhost', 57 port => 9090, 58 recvTimeout => 10000, 59 sendTimeout => 10000, 60 61 handle => undef 62 }; 63 64 if (defined $opts and ref $opts eq ref {}) { 65 66 # argument is a hash of options so override the defaults 67 $self->{$_} = $opts->{$_} for keys %$opts; 68 69 } else { 70 71 # older style constructor takes 3 arguments, none of which are required 72 $self->{host} = $opts || 'localhost'; 73 $self->{port} = shift || 9090; 74 75 } 76 77 return bless($self,$classname); 78} 79 80 81sub setSendTimeout 82{ 83 my $self = shift; 84 my $timeout = shift; 85 86 $self->{sendTimeout} = $timeout; 87} 88 89sub setRecvTimeout 90{ 91 my $self = shift; 92 my $timeout = shift; 93 94 $self->{recvTimeout} = $timeout; 95} 96 97 98# 99# Tests whether this is open 100# 101# @return bool true if the socket is open 102# 103sub isOpen 104{ 105 my $self = shift; 106 107 if( defined $self->{handle} ){ 108 return ($self->{handle}->handles())[0]->connected; 109 } 110 111 return 0; 112} 113 114# 115# Connects the socket. 116# 117sub open 118{ 119 my $self = shift; 120 121 my $sock = $self->__open() || do { 122 my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')'; 123 die Thrift::TTransportException->new($error, Thrift::TTransportException::NOT_OPEN); 124 }; 125 126 $self->{handle} = IO::Select->new( $sock ); 127} 128 129# 130# Closes the socket. 131# 132sub close 133{ 134 my $self = shift; 135 if( defined $self->{handle} ) { 136 $self->__close(); 137 } 138} 139 140# 141# Uses stream get contents to do the reading 142# 143# @param int $len How many bytes 144# @return string Binary data 145# 146sub readAll 147{ 148 my $self = shift; 149 my $len = shift; 150 151 152 return unless defined $self->{handle}; 153 154 my $pre = ""; 155 while (1) { 156 157 my $sock = $self->__wait(); 158 my $buf = $self->__recv($sock, $len); 159 160 if (!defined $buf || $buf eq '') { 161 162 die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '. 163 $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE); 164 165 } 166 elsif ((my $sz = length($buf)) < $len) { 167 168 $pre .= $buf; 169 $len -= $sz; 170 171 } 172 else { 173 return $pre.$buf; 174 } 175 } 176} 177 178# 179# Read from the socket 180# 181# @param int $len How many bytes 182# @return string Binary data 183# 184sub read 185{ 186 my $self = shift; 187 my $len = shift; 188 189 return unless defined $self->{handle}; 190 191 my $sock = $self->__wait(); 192 my $buf = $self->__recv($sock, $len); 193 194 if (!defined $buf || $buf eq '') { 195 196 die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '. 197 $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE); 198 199 } 200 201 return $buf; 202} 203 204 205# 206# Write to the socket. 207# 208# @param string $buf The data to write 209# 210sub write 211{ 212 my $self = shift; 213 my $buf = shift; 214 215 return unless defined $self->{handle}; 216 217 while (length($buf) > 0) { 218 #check for timeout 219 my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 ); 220 221 if(@sockets == 0){ 222 die Thrift::TTransportException->new(ref($self).': timed out writing to bytes from '. 223 $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT); 224 } 225 226 my $sent = $self->__send($sockets[0], $buf); 227 228 if (!defined $sent || $sent == 0 ) { 229 230 die Thrift::TTransportException->new(ref($self).': Could not write '.length($buf).' bytes '. 231 $self->{host}.':'.$self->{host}, Thrift::TTransportException::END_OF_FILE); 232 233 } 234 235 $buf = substr($buf, $sent); 236 } 237} 238 239# 240# Flush output to the socket. 241# 242sub flush 243{ 244 my $self = shift; 245 246 return unless defined $self->{handle}; 247 248 my $ret = ($self->{handle}->handles())[0]->flush; 249} 250 251### 252### Overridable methods 253### 254 255# 256# Open a connection to a server. 257# 258sub __open 259{ 260 my $self = shift; 261 return IO::Socket::INET->new(PeerAddr => $self->{host}, 262 PeerPort => $self->{port}, 263 Proto => 'tcp', 264 Timeout => $self->{sendTimeout} / 1000); 265} 266 267# 268# Close the connection 269# 270sub __close 271{ 272 my $self = shift; 273 CORE::close(($self->{handle}->handles())[0]); 274} 275 276# 277# Read data 278# 279# @param[in] $sock the socket 280# @param[in] $len the length to read 281# @returns the data buffer that was read 282# 283sub __recv 284{ 285 my $self = shift; 286 my $sock = shift; 287 my $len = shift; 288 my $buf = undef; 289 $sock->recv($buf, $len); 290 return $buf; 291} 292 293# 294# Send data 295# 296# @param[in] $sock the socket 297# @param[in] $buf the data buffer 298# @returns the number of bytes written 299# 300sub __send 301{ 302 my $self = shift; 303 my $sock = shift; 304 my $buf = shift; 305 return $sock->send($buf); 306} 307 308# 309# Wait for data to be readable 310# 311# @returns a socket that can be read 312# 313sub __wait 314{ 315 my $self = shift; 316 my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 ); 317 318 if (@sockets == 0) { 319 die Thrift::TTransportException->new(ref($self).': timed out reading from '. 320 $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT); 321 } 322 323 return $sockets[0]; 324} 325 326 3271; 328