1#
2# Licensed to the Apache Software Foundation (ASF) under one
3# or more contributor license agreements. See the NOTICE file
4# distributed with this work for additional information
5# regarding copyright ownership. The ASF licenses this file
6# to you under the Apache License, Version 2.0 (the
7# "License"); you may not use this file except in compliance
8# with the License. You may obtain a copy of the License at
9#
10#   http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing,
13# software distributed under the License is distributed on an
14# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15# KIND, either express or implied. See the License for the
16# specific language governing permissions and limitations
17# under the License.
18#
19
20use 5.10.0;
21use strict;
22use warnings;
23
24use Thrift;
25use Thrift::Exception;
26use Thrift::Transport;
27
28use IO::Socket::INET;
29use IO::Select;
30
31package Thrift::Socket;
32use base qw( Thrift::Transport );
33use version 0.77; our $VERSION = version->declare("$Thrift::VERSION");
34
35#
36# Construction and usage
37#
38# my $opts = {}
39# my $socket = Thrift::Socket->new(\%opts);
40#
41# options:
42#
43# host        => host to connect to
44# port        => port to connect to
45# sendTimeout => timeout used for send and for connect
46# recvTimeout => timeout used for recv
47#
48
49sub new
50{
51    my $classname = shift;
52    my $opts      = shift;
53
54    # default settings:
55    my $self = {
56        host         => 'localhost',
57        port         => 9090,
58        recvTimeout  => 10000,
59        sendTimeout  => 10000,
60
61        handle       => undef
62    };
63
64    if (defined $opts and ref $opts eq ref {}) {
65
66      # argument is a hash of options so override the defaults
67      $self->{$_} = $opts->{$_} for keys %$opts;
68
69    } else {
70
71      # older style constructor takes 3 arguments, none of which are required
72      $self->{host} = $opts || 'localhost';
73      $self->{port} = shift || 9090;
74
75    }
76
77    return bless($self,$classname);
78}
79
80
81sub setSendTimeout
82{
83    my $self    = shift;
84    my $timeout = shift;
85
86    $self->{sendTimeout} = $timeout;
87}
88
89sub setRecvTimeout
90{
91    my $self    = shift;
92    my $timeout = shift;
93
94    $self->{recvTimeout} = $timeout;
95}
96
97
98#
99# Tests whether this is open
100#
101# @return bool true if the socket is open
102#
103sub isOpen
104{
105    my $self = shift;
106
107    if( defined $self->{handle} ){
108        return ($self->{handle}->handles())[0]->connected;
109    }
110
111    return 0;
112}
113
114#
115# Connects the socket.
116#
117sub open
118{
119    my $self = shift;
120
121    my $sock = $self->__open() || do {
122        my $error = ref($self).': Could not connect to '.$self->{host}.':'.$self->{port}.' ('.$!.')';
123        die Thrift::TTransportException->new($error, Thrift::TTransportException::NOT_OPEN);
124    };
125
126    $self->{handle} = IO::Select->new( $sock );
127}
128
129#
130# Closes the socket.
131#
132sub close
133{
134    my $self = shift;
135    if( defined $self->{handle} ) {
136      $self->__close();
137    }
138}
139
140#
141# Uses stream get contents to do the reading
142#
143# @param int $len How many bytes
144# @return string Binary data
145#
146sub readAll
147{
148    my $self = shift;
149    my $len  = shift;
150
151
152    return unless defined $self->{handle};
153
154    my $pre = "";
155    while (1) {
156
157        my $sock = $self->__wait();
158        my $buf = $self->__recv($sock, $len);
159
160        if (!defined $buf || $buf eq '') {
161
162            die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
163                               $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
164
165        }
166        elsif ((my $sz = length($buf)) < $len) {
167
168            $pre .= $buf;
169            $len -= $sz;
170
171        }
172        else {
173            return $pre.$buf;
174        }
175    }
176}
177
178#
179# Read from the socket
180#
181# @param int $len How many bytes
182# @return string Binary data
183#
184sub read
185{
186    my $self = shift;
187    my $len  = shift;
188
189    return unless defined $self->{handle};
190
191    my $sock = $self->__wait();
192    my $buf = $self->__recv($sock, $len);
193
194    if (!defined $buf || $buf eq '') {
195
196        die Thrift::TTransportException->new(ref($self).': Could not read '.$len.' bytes from '.
197                           $self->{host}.':'.$self->{port}, Thrift::TTransportException::END_OF_FILE);
198
199    }
200
201    return $buf;
202}
203
204
205#
206# Write to the socket.
207#
208# @param string $buf The data to write
209#
210sub write
211{
212    my $self = shift;
213    my $buf  = shift;
214
215    return unless defined $self->{handle};
216
217    while (length($buf) > 0) {
218        #check for timeout
219        my @sockets = $self->{handle}->can_write( $self->{sendTimeout} / 1000 );
220
221        if(@sockets == 0){
222            die Thrift::TTransportException->new(ref($self).': timed out writing to bytes from '.
223                                       $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
224        }
225
226        my $sent = $self->__send($sockets[0], $buf);
227
228        if (!defined $sent || $sent == 0 ) {
229
230            die Thrift::TTransportException->new(ref($self).': Could not write '.length($buf).' bytes '.
231                                 $self->{host}.':'.$self->{host}, Thrift::TTransportException::END_OF_FILE);
232
233        }
234
235        $buf = substr($buf, $sent);
236    }
237}
238
239#
240# Flush output to the socket.
241#
242sub flush
243{
244    my $self = shift;
245
246    return unless defined $self->{handle};
247
248    my $ret = ($self->{handle}->handles())[0]->flush;
249}
250
251###
252### Overridable methods
253###
254
255#
256# Open a connection to a server.
257#
258sub __open
259{
260    my $self = shift;
261    return IO::Socket::INET->new(PeerAddr => $self->{host},
262                                 PeerPort => $self->{port},
263                                 Proto    => 'tcp',
264                                 Timeout  => $self->{sendTimeout} / 1000);
265}
266
267#
268# Close the connection
269#
270sub __close
271{
272  my $self = shift;
273    CORE::close(($self->{handle}->handles())[0]);
274}
275
276#
277# Read data
278#
279# @param[in] $sock the socket
280# @param[in] $len the length to read
281# @returns the data buffer that was read
282#
283sub __recv
284{
285  my $self = shift;
286  my $sock = shift;
287  my $len = shift;
288  my $buf = undef;
289  $sock->recv($buf, $len);
290  return $buf;
291}
292
293#
294# Send data
295#
296# @param[in] $sock the socket
297# @param[in] $buf the data buffer
298# @returns the number of bytes written
299#
300sub __send
301{
302    my $self = shift;
303    my $sock = shift;
304    my $buf = shift;
305    return $sock->send($buf);
306}
307
308#
309# Wait for data to be readable
310#
311# @returns a socket that can be read
312#
313sub __wait
314{
315    my $self = shift;
316    my @sockets = $self->{handle}->can_read( $self->{recvTimeout} / 1000 );
317
318    if (@sockets == 0) {
319        die Thrift::TTransportException->new(ref($self).': timed out reading from '.
320                                   $self->{host}.':'.$self->{port}, Thrift::TTransportException::TIMED_OUT);
321    }
322
323    return $sockets[0];
324}
325
326
3271;
328