1#  Copyright 2014 - present MongoDB, Inc.
2#
3#  Licensed under the Apache License, Version 2.0 (the "License");
4#  you may not use this file except in compliance with the License.
5#  You may obtain a copy of the License at
6#
7#  http://www.apache.org/licenses/LICENSE-2.0
8#
9#  Unless required by applicable law or agreed to in writing, software
10#  distributed under the License is distributed on an "AS IS" BASIS,
11#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12#  See the License for the specific language governing permissions and
13#  limitations under the License.
14
15# Some portions of this code were copied and adapted from the Perl module
16# HTTP::Tiny, which is copyright Christian Hansen, David Golden and other
17# contributors and used with permission under the terms of the Artistic License
18
19use v5.8.0;
20use strict;
21use warnings;
22
23package MongoDB::_Link;
24
25use version;
26our $VERSION = 'v2.2.2';
27
28use Moo;
29use Errno qw[EINTR EPIPE];
30use IO::Socket qw[SOCK_STREAM];
31use Scalar::Util qw/refaddr/;
32use Socket qw/SOL_SOCKET SO_KEEPALIVE SO_RCVBUF IPPROTO_TCP TCP_NODELAY AF_INET/;
33use Time::HiRes qw/time/;
34use MongoDB::Error;
35use MongoDB::_Constants;
36use MongoDB::_Protocol;
37use MongoDB::_Types qw(
38    Boolish
39    HostAddress
40    NonNegNum
41    Numish
42    ServerDesc
43);
44use Types::Standard qw(
45    HashRef
46    Maybe
47    Str
48    Undef
49);
50use namespace::clean;
51
52my $SOCKET_CLASS =
53  eval { require IO::Socket::IP; IO::Socket::IP->VERSION(0.32) }
54  ? 'IO::Socket::IP'
55  : 'IO::Socket::INET';
56
57has address => (
58    is => 'ro',
59    required => 1,
60    isa => HostAddress,
61);
62
63has connect_timeout => (
64    is => 'ro',
65    default => 20,
66    isa => Numish,
67);
68
69has socket_timeout => (
70    is => 'ro',
71    default => 30,
72    isa => Numish|Undef,
73);
74
75has with_ssl => (
76    is => 'ro',
77    isa => Boolish,
78);
79
80has SSL_options => (
81    is => 'ro',
82    default => sub { {} },
83    isa => HashRef,
84);
85
86has server => (
87    is => 'rwp',
88    init_arg => undef,
89    isa => Maybe[ServerDesc],
90);
91
92has host => (
93    is => 'lazy',
94    init_arg => undef,
95    isa => Str,
96);
97
98sub _build_host {
99    my ($self) = @_;
100    my ($host, $port) = split /:/, $self->address;
101    return $host;
102}
103
104my @is_master_fields= qw(
105  min_wire_version max_wire_version
106  max_message_size_bytes max_write_batch_size max_bson_object_size
107);
108
109for my $f ( @is_master_fields ) {
110    has $f => (
111        is => 'rwp',
112        init_arg => undef,
113        isa => Maybe[NonNegNum],
114    );
115}
116
117# wire version >= 2
118has supports_write_commands => (
119    is => 'rwp',
120    init_arg => undef,
121    isa => Boolish,
122);
123
124# wire version >= 3
125has supports_list_commands => (
126    is => 'rwp',
127    init_arg => undef,
128    isa => Boolish,
129);
130
131has supports_scram_sha1 => (
132    is => 'rwp',
133    init_arg => undef,
134    isa => Boolish,
135);
136
137# wire version >= 4
138has supports_document_validation => (
139    is => 'rwp',
140    init_arg => undef,
141    isa => Boolish,
142);
143
144has supports_explain_command => (
145    is => 'rwp',
146    init_arg => undef,
147    isa => Boolish,
148);
149
150has supports_query_commands => (
151    is => 'rwp',
152    init_arg => undef,
153    isa => Boolish,
154);
155
156has supports_find_modify_write_concern => (
157    is => 'rwp',
158    init_arg => undef,
159    isa => Boolish,
160);
161
162has supports_fsync_command => (
163    is => 'rwp',
164    init_arg => undef,
165    isa => Boolish,
166);
167
168has supports_read_concern => (
169    is => 'rwp',
170    init_arg => undef,
171    isa => Boolish,
172);
173
174# wire version >= 5
175has supports_collation => (
176    is => 'rwp',
177    init_arg => undef,
178    isa => Boolish,
179);
180
181has supports_helper_write_concern => (
182    is => 'rwp',
183    init_arg => undef,
184    isa => Boolish,
185);
186
187has supports_x509_user_from_cert => (
188    is => 'rwp',
189    init_arg => undef,
190    isa => Boolish,
191);
192
193# for caching wire version >=6
194has supports_arrayFilters => (
195    is => 'rwp',
196    init_arg => undef,
197    isa => Boolish,
198);
199
200has supports_clusterTime => (
201    is => 'rwp',
202    init_arg => undef,
203    isa => Boolish,
204);
205
206has supports_db_aggregation => (
207    is => 'rwp',
208    init_arg => undef,
209    isa => Boolish,
210);
211
212has supports_retryWrites => (
213    is => 'rwp',
214    init_arg => undef,
215    isa => Boolish,
216);
217
218has supports_op_msg => (
219    is => 'rwp',
220    init_arg => undef,
221    isa => Boolish,
222);
223
224has supports_retryReads => (
225    is => 'rwp',
226    init_arg => undef,
227    isa => Boolish,
228);
229
230# for wire version >= 7
231has supports_4_0_changestreams => (
232    is => 'rwp',
233    init_arg => undef,
234    isa => Boolish,
235  );
236
237# wire version >= 8
238has supports_aggregate_out_read_concern => (
239    is => 'rwp',
240    init_arg => undef,
241    isa => Boolish,
242);
243
244my @connection_state_fields = qw(
245    fh connected rcvbuf last_used fdset is_ssl
246);
247
248for my $f ( @connection_state_fields ) {
249    has $f => (
250        is => 'rwp',
251        clearer => "_clear_$f",
252        init_arg => undef,
253    );
254}
255
256around BUILDARGS => sub {
257    my $orig = shift;
258    my $class = shift;
259    my $hr = $class->$orig(@_);
260
261    # shortcut on missing required field
262    return $hr unless exists $hr->{address};
263
264    ($hr->{host}, $hr->{port}) = split /:/, $hr->{address};
265
266    return $hr;
267};
268
269sub connect {
270    @_ == 1 || MongoDB::UsageError->throw( q/Usage: $handle->connect()/ . "\n" );
271    my ($self) = @_;
272
273    if ( $self->with_ssl ) {
274        $self->_assert_ssl;
275        # XXX possibly make SOCKET_CLASS an instance variable and set it here to IO::Socket::SSL
276    }
277
278    my ($host, $port) = split /:/, $self->address;
279
280    # PERL-715: For 'localhost' where MongoDB is only listening on IPv4 and
281    # getaddrinfo returns an IPv6 address before an IPv4 address, some
282    # operating systems tickle a bug in IO::Socket::IP that causes
283    # connection attempts to fail before trying the IPv4 address.  As a
284    # workaround, we always force 'localhost' to use IPv4.
285
286    my $fh = $SOCKET_CLASS->new(
287        PeerHost => $ENV{TEST_MONGO_SOCKET_HOST} || $host,
288        PeerPort => $port,
289        ( lc($host) eq 'localhost' ? ( Family => AF_INET ) : () ),
290        Proto    => 'tcp',
291        Type     => SOCK_STREAM,
292        Timeout  => $self->connect_timeout >= 0 ? $self->connect_timeout : undef,
293      )
294      or
295      MongoDB::NetworkError->throw(qq/Could not connect to '@{[$self->address]}': $@\n/);
296
297    unless ( binmode($fh) ) {
298        undef $fh;
299        MongoDB::InternalError->throw(qq/Could not binmode() socket: '$!'\n/);
300    }
301
302    unless ( defined( $fh->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 ) ) ) {
303        undef $fh;
304        MongoDB::InternalError->throw(qq/Could not set TCP_NODELAY on socket: '$!'\n/);
305    }
306
307    unless ( defined( $fh->setsockopt( SOL_SOCKET, SO_KEEPALIVE, 1 ) ) ) {
308        undef $fh;
309        MongoDB::InternalError->throw(qq/Could not set SO_KEEPALIVE on socket: '$!'\n/);
310    }
311
312    $self->_set_fh($fh);
313    $self->_set_connected(1);
314
315    my $fd = fileno $fh;
316    unless ( defined $fd && $fd >= 0 ) {
317        $self->_close;
318        MongoDB::InternalError->throw(qq/select(2): 'Bad file descriptor'\n/);
319    }
320    vec( my $fdset = '', $fd, 1 ) = 1;
321    $self->_set_fdset( $fdset );
322
323    $self->start_ssl($host) if $self->with_ssl;
324
325    $self->_set_last_used( time );
326    $self->_set_rcvbuf( $fh->sockopt(SO_RCVBUF) );
327
328    # Default max msg size is 2 * max BSON object size (DRIVERS-1)
329    $self->_set_max_message_size_bytes( 2 * MAX_BSON_OBJECT_SIZE );
330
331    return $self;
332}
333
334sub set_metadata {
335    my ( $self, $server ) = @_;
336    $self->_set_server($server);
337    $self->_set_min_wire_version( $server->is_master->{minWireVersion} || "0" );
338    $self->_set_max_wire_version( $server->is_master->{maxWireVersion} || "0" );
339    $self->_set_max_bson_object_size( $server->is_master->{maxBsonObjectSize}
340          || MAX_BSON_OBJECT_SIZE );
341    $self->_set_max_write_batch_size( $server->is_master->{maxWriteBatchSize}
342          || MAX_WRITE_BATCH_SIZE );
343
344    # Default is 2 * max BSON object size (DRIVERS-1)
345    $self->_set_max_message_size_bytes( $server->is_master->{maxMessageSizeBytes}
346          || 2 * $self->max_bson_object_size );
347
348    if ( $self->accepts_wire_version(2) ) {
349        $self->_set_supports_write_commands(1);
350    }
351    if ( $self->accepts_wire_version(3) ) {
352        $self->_set_supports_list_commands(1);
353        $self->_set_supports_scram_sha1(1);
354    }
355    if ( $self->accepts_wire_version(4) ) {
356        $self->_set_supports_document_validation(1);
357        $self->_set_supports_explain_command(1);
358        $self->_set_supports_query_commands(1);
359        $self->_set_supports_find_modify_write_concern(1);
360        $self->_set_supports_fsync_command(1);
361        $self->_set_supports_read_concern(1);
362    }
363    if ( $self->accepts_wire_version(5) ) {
364        $self->_set_supports_collation(1);
365        $self->_set_supports_helper_write_concern(1);
366        $self->_set_supports_x509_user_from_cert(1);
367    }
368    if ( $self->accepts_wire_version(6) ) {
369        $self->_set_supports_arrayFilters(1);
370        $self->_set_supports_clusterTime(1);
371        $self->_set_supports_db_aggregation(1);
372        $self->_set_supports_retryWrites(
373            defined( $server->logical_session_timeout_minutes )
374              && ( $server->type ne 'Standalone' )
375            ? 1
376            : 0
377        );
378        $self->_set_supports_op_msg(1);
379        $self->_set_supports_retryReads(1);
380    }
381    if ( $self->accepts_wire_version(7) ) {
382        $self->_set_supports_4_0_changestreams(1);
383    }
384    if ( $self->accepts_wire_version(8) ) {
385        $self->_set_supports_aggregate_out_read_concern(1);
386    }
387
388    return;
389}
390
391sub accepts_wire_version {
392    my ( $self, $version ) = @_;
393    my $min = $self->min_wire_version || 0;
394    my $max = $self->max_wire_version || 0;
395    return $version >= $min && $version <= $max;
396}
397
398sub start_ssl {
399    my ( $self, $host ) = @_;
400
401    my $ssl_args = $self->_ssl_args($host);
402    IO::Socket::SSL->start_SSL(
403        $self->fh,
404        %$ssl_args,
405        SSL_create_ctx_callback => sub {
406            my $ctx = shift;
407            Net::SSLeay::CTX_set_mode( $ctx, Net::SSLeay::MODE_AUTO_RETRY() );
408        },
409    );
410
411    unless ( ref( $self->fh ) eq 'IO::Socket::SSL' ) {
412        my $ssl_err = IO::Socket::SSL->errstr;
413        $self->_close;
414        MongoDB::HandshakeError->throw(qq/SSL connection failed for $host: $ssl_err\n/);
415    }
416}
417
418sub client_certificate_subject {
419    my ($self) = @_;
420    return "" unless $self->fh && $self->fh->isa("IO::Socket::SSL");
421
422    my $client_cert = $self->fh->sock_certificate()
423      or return "";
424
425    my $subject_raw = Net::SSLeay::X509_get_subject_name($client_cert)
426      or return "";
427
428    my $subject =
429      Net::SSLeay::X509_NAME_print_ex( $subject_raw, Net::SSLeay::XN_FLAG_RFC2253() );
430
431    return $subject;
432}
433
434sub close {
435    my ($self) = @_;
436    $self->_close
437      or MongoDB::NetworkError->throw(qq/Error closing socket: '$!'\n/);
438}
439
440# this is a quiet close so preexisting network errors can be thrown
441sub _close {
442    my ($self) = @_;
443    $self->_clear_connected;
444    my $ok = 1;
445    if ( $self->fh ) {
446        $ok = CORE::close( $self->fh );
447        $self->_clear_fh;
448    }
449    return $ok;
450}
451
452sub is_connected {
453    my ($self) = @_;
454    return $self->connected && $self->fh;
455}
456
457sub write {
458    my ( $self, $buf, $write_opt ) = @_;
459    $write_opt ||= {};
460
461    if (
462        !$write_opt->{disable_compression}
463        && $self->server
464        && $self->server->compressor
465    ) {
466        $buf = MongoDB::_Protocol::compress(
467            $buf,
468            $self->server->compressor,
469        );
470    }
471
472    my ( $len, $off, $pending, $nfound, $r ) = ( length($buf), 0 );
473
474    MongoDB::ProtocolError->throw(
475        qq/Message of size $len exceeds maximum of / . $self->{max_message_size_bytes} )
476      if $len > $self->max_message_size_bytes;
477
478    local $SIG{PIPE} = 'IGNORE';
479
480    while () {
481
482        # do timeout
483        ( $pending, $nfound ) = ( $self->socket_timeout, 0 );
484        TIMEOUT: while () {
485            if ( -1 == ( $nfound = select( undef, $self->fdset, undef, $pending ) ) ) {
486                unless ( $! == EINTR ) {
487                    $self->_close;
488                    MongoDB::NetworkError->throw(qq/select(2): '$!'\n/);
489                }
490                # to avoid overhead tracking monotonic clock times; assume
491                # interrupts occur on average halfway through the timeout period
492                # and restart with half the original time
493                $pending = int( $pending / 2 );
494                redo TIMEOUT;
495            }
496            last TIMEOUT;
497        }
498        unless ($nfound) {
499            $self->_close;
500            MongoDB::NetworkTimeout->throw(
501                qq/Timed out while waiting for socket to become ready for writing\n/);
502        }
503
504        # do write
505        if ( defined( $r = syswrite( $self->fh, $buf, $len, $off ) ) ) {
506            ( $len -= $r ), ( $off += $r );
507            last unless $len > 0;
508        }
509        elsif ( $! == EPIPE ) {
510            $self->_close;
511            MongoDB::NetworkError->throw(qq/Socket closed by remote server: $!\n/);
512        }
513        elsif ( $! != EINTR ) {
514            if ( $self->fh->can('errstr') ) {
515                my $err = $self->fh->errstr();
516                $self->_close;
517                MongoDB::NetworkError->throw(qq/Could not write to SSL socket: '$err'\n /);
518            }
519            else {
520                $self->_close;
521                MongoDB::NetworkError->throw(qq/Could not write to socket: '$!'\n/);
522            }
523
524        }
525    }
526
527    $self->_set_last_used(time);
528
529    return;
530}
531
532sub read {
533    my ($self) = @_;
534
535    # len of undef triggers first pass through loop
536    my ( $msg, $len, $pending, $nfound, $r ) = ( '', undef );
537
538    while () {
539
540        # do timeout
541        ( $pending, $nfound ) = ( $self->socket_timeout, 0 );
542        TIMEOUT: while () {
543            # no need to select if SSL and has pending data from a frame
544            if ( $self->with_ssl ) {
545                ( $nfound = 1 ), last TIMEOUT
546                  if $self->fh->pending;
547            }
548
549            if ( -1 == ( $nfound = select( $self->fdset, undef, undef, $pending ) ) ) {
550                unless ( $! == EINTR ) {
551                    $self->_close;
552                    MongoDB::NetworkError->throw(qq/select(2): '$!'\n/);
553                }
554                # to avoid overhead tracking monotonic clock times; assume
555                # interrupts occur on average halfway through the timeout period
556                # and restart with half the original time
557                $pending = int( $pending / 2 );
558                redo TIMEOUT;
559            }
560            last TIMEOUT;
561        }
562        unless ($nfound) {
563            $self->_close;
564            MongoDB::NetworkTimeout->throw(
565                q/Timed out while waiting for socket to become ready for reading/ . "\n" );
566        }
567
568        # read up to SO_RCVBUF if we can
569        if ( defined( $r = sysread( $self->fh, $msg, $self->rcvbuf, length $msg ) ) ) {
570            # because select said we're ready to read, if we read 0 then
571            # we got EOF before the full message
572            if ( !$r ) {
573                $self->_close;
574                MongoDB::NetworkError->throw(qq/Unexpected end of stream\n/);
575            }
576        }
577        elsif ( $! != EINTR ) {
578            if ( $self->fh->can('errstr') ) {
579                my $err = $self->fh->errstr();
580                $self->_close;
581                MongoDB::NetworkError->throw(qq/Could not read from SSL socket: '$err'\n /);
582            }
583            else {
584                $self->_close;
585                MongoDB::NetworkError->throw(qq/Could not read from socket: '$!'\n/);
586            }
587        }
588
589        if ( !defined $len ) {
590            next if length($msg) < 4;
591            $len = unpack( P_INT32, $msg );
592            MongoDB::ProtocolError->throw(
593                qq/Server reply of size $len exceeds maximum of / . $self->{max_message_size_bytes} )
594              if $len > $self->max_message_size_bytes;
595        }
596        last unless length($msg) < $len;
597    }
598
599    $self->_set_last_used(time);
600
601    return $msg;
602}
603
604sub _assert_ssl {
605    # Need IO::Socket::SSL 1.42 for SSL_create_ctx_callback
606    MongoDB::UsageError->throw(qq/IO::Socket::SSL 1.42 must be installed for SSL support\n/)
607      unless eval { require IO::Socket::SSL; IO::Socket::SSL->VERSION(1.42) };
608    # Need Net::SSLeay 1.49 for MODE_AUTO_RETRY
609    MongoDB::UsageError->throw(qq/Net::SSLeay 1.49 must be installed for SSL support\n/)
610      unless eval { require Net::SSLeay; Net::SSLeay->VERSION(1.49) };
611}
612
613# Try to find a CA bundle to validate the SSL cert,
614# prefer Mozilla::CA or fallback to a system file
615sub _find_CA_file {
616    my $self = shift();
617
618    return $self->SSL_options->{SSL_ca_file}
619      if $self->SSL_options->{SSL_ca_file} and -e $self->SSL_options->{SSL_ca_file};
620
621    return Mozilla::CA::SSL_ca_file()
622      if eval { require Mozilla::CA };
623
624    # cert list copied from golang src/crypto/x509/root_unix.go
625    foreach my $ca_bundle (
626        "/etc/ssl/certs/ca-certificates.crt",     # Debian/Ubuntu/Gentoo etc.
627        "/etc/pki/tls/certs/ca-bundle.crt",       # Fedora/RHEL
628        "/etc/ssl/ca-bundle.pem",                 # OpenSUSE
629        "/etc/openssl/certs/ca-certificates.crt", # NetBSD
630        "/etc/ssl/cert.pem",                      # OpenBSD
631        "/usr/local/share/certs/ca-root-nss.crt", # FreeBSD/DragonFly
632        "/etc/pki/tls/cacert.pem",                # OpenELEC
633        "/etc/certs/ca-certificates.crt",         # Solaris 11.2+
634    ) {
635        return $ca_bundle if -e $ca_bundle;
636    }
637
638    MongoDB::UsageError->throw(
639      qq/Couldn't find a CA bundle with which to verify the SSL certificate.\n/
640      . qq/Try installing Mozilla::CA from CPAN\n/);
641}
642
643sub _ssl_args {
644    my ( $self, $host ) = @_;
645
646    my %ssl_args;
647
648    # This test reimplements IO::Socket::SSL::can_client_sni(), which wasn't
649    # added until IO::Socket::SSL 1.84
650    if ( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x10000000 ) {
651        $ssl_args{SSL_hostname} = $host, # Sane SNI support
652    }
653
654    if ( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x10100000 ) {
655        $ssl_args{SSL_OP_NO_RENEGOTIATION} = Net::SSLeay::OP_NO_RENEGOTIATION();
656    }
657
658    $ssl_args{SSL_verifycn_scheme} = 'http';              # enable CN validation
659    $ssl_args{SSL_verifycn_name}   = $host;               # set validation hostname
660    $ssl_args{SSL_verify_mode}     = 0x01;                # enable cert validation
661    $ssl_args{SSL_ca_file}         = $self->_find_CA_file;
662
663    # user options override default settings
664    for my $k ( keys %{ $self->SSL_options } ) {
665        $ssl_args{$k} = $self->SSL_options->{$k} if $k =~ m/^SSL_/;
666    }
667
668    return \%ssl_args;
669}
670
6711;
672
673# vim: ts=4 sts=4 sw=4 et:
674