1# Copyright 2014 - present MongoDB, Inc. 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); 4# you may not use this file except in compliance with the License. 5# You may obtain a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, 11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12# See the License for the specific language governing permissions and 13# limitations under the License. 14 15# Some portions of this code were copied and adapted from the Perl module 16# HTTP::Tiny, which is copyright Christian Hansen, David Golden and other 17# contributors and used with permission under the terms of the Artistic License 18 19use v5.8.0; 20use strict; 21use warnings; 22 23package MongoDB::_Link; 24 25use version; 26our $VERSION = 'v2.2.2'; 27 28use Moo; 29use Errno qw[EINTR EPIPE]; 30use IO::Socket qw[SOCK_STREAM]; 31use Scalar::Util qw/refaddr/; 32use Socket qw/SOL_SOCKET SO_KEEPALIVE SO_RCVBUF IPPROTO_TCP TCP_NODELAY AF_INET/; 33use Time::HiRes qw/time/; 34use MongoDB::Error; 35use MongoDB::_Constants; 36use MongoDB::_Protocol; 37use MongoDB::_Types qw( 38 Boolish 39 HostAddress 40 NonNegNum 41 Numish 42 ServerDesc 43); 44use Types::Standard qw( 45 HashRef 46 Maybe 47 Str 48 Undef 49); 50use namespace::clean; 51 52my $SOCKET_CLASS = 53 eval { require IO::Socket::IP; IO::Socket::IP->VERSION(0.32) } 54 ? 'IO::Socket::IP' 55 : 'IO::Socket::INET'; 56 57has address => ( 58 is => 'ro', 59 required => 1, 60 isa => HostAddress, 61); 62 63has connect_timeout => ( 64 is => 'ro', 65 default => 20, 66 isa => Numish, 67); 68 69has socket_timeout => ( 70 is => 'ro', 71 default => 30, 72 isa => Numish|Undef, 73); 74 75has with_ssl => ( 76 is => 'ro', 77 isa => Boolish, 78); 79 80has SSL_options => ( 81 is => 'ro', 82 default => sub { {} }, 83 isa => HashRef, 84); 85 86has server => ( 87 is => 'rwp', 88 init_arg => undef, 89 isa => Maybe[ServerDesc], 90); 91 92has host => ( 93 is => 'lazy', 94 init_arg => undef, 95 isa => Str, 96); 97 98sub _build_host { 99 my ($self) = @_; 100 my ($host, $port) = split /:/, $self->address; 101 return $host; 102} 103 104my @is_master_fields= qw( 105 min_wire_version max_wire_version 106 max_message_size_bytes max_write_batch_size max_bson_object_size 107); 108 109for my $f ( @is_master_fields ) { 110 has $f => ( 111 is => 'rwp', 112 init_arg => undef, 113 isa => Maybe[NonNegNum], 114 ); 115} 116 117# wire version >= 2 118has supports_write_commands => ( 119 is => 'rwp', 120 init_arg => undef, 121 isa => Boolish, 122); 123 124# wire version >= 3 125has supports_list_commands => ( 126 is => 'rwp', 127 init_arg => undef, 128 isa => Boolish, 129); 130 131has supports_scram_sha1 => ( 132 is => 'rwp', 133 init_arg => undef, 134 isa => Boolish, 135); 136 137# wire version >= 4 138has supports_document_validation => ( 139 is => 'rwp', 140 init_arg => undef, 141 isa => Boolish, 142); 143 144has supports_explain_command => ( 145 is => 'rwp', 146 init_arg => undef, 147 isa => Boolish, 148); 149 150has supports_query_commands => ( 151 is => 'rwp', 152 init_arg => undef, 153 isa => Boolish, 154); 155 156has supports_find_modify_write_concern => ( 157 is => 'rwp', 158 init_arg => undef, 159 isa => Boolish, 160); 161 162has supports_fsync_command => ( 163 is => 'rwp', 164 init_arg => undef, 165 isa => Boolish, 166); 167 168has supports_read_concern => ( 169 is => 'rwp', 170 init_arg => undef, 171 isa => Boolish, 172); 173 174# wire version >= 5 175has supports_collation => ( 176 is => 'rwp', 177 init_arg => undef, 178 isa => Boolish, 179); 180 181has supports_helper_write_concern => ( 182 is => 'rwp', 183 init_arg => undef, 184 isa => Boolish, 185); 186 187has supports_x509_user_from_cert => ( 188 is => 'rwp', 189 init_arg => undef, 190 isa => Boolish, 191); 192 193# for caching wire version >=6 194has supports_arrayFilters => ( 195 is => 'rwp', 196 init_arg => undef, 197 isa => Boolish, 198); 199 200has supports_clusterTime => ( 201 is => 'rwp', 202 init_arg => undef, 203 isa => Boolish, 204); 205 206has supports_db_aggregation => ( 207 is => 'rwp', 208 init_arg => undef, 209 isa => Boolish, 210); 211 212has supports_retryWrites => ( 213 is => 'rwp', 214 init_arg => undef, 215 isa => Boolish, 216); 217 218has supports_op_msg => ( 219 is => 'rwp', 220 init_arg => undef, 221 isa => Boolish, 222); 223 224has supports_retryReads => ( 225 is => 'rwp', 226 init_arg => undef, 227 isa => Boolish, 228); 229 230# for wire version >= 7 231has supports_4_0_changestreams => ( 232 is => 'rwp', 233 init_arg => undef, 234 isa => Boolish, 235 ); 236 237# wire version >= 8 238has supports_aggregate_out_read_concern => ( 239 is => 'rwp', 240 init_arg => undef, 241 isa => Boolish, 242); 243 244my @connection_state_fields = qw( 245 fh connected rcvbuf last_used fdset is_ssl 246); 247 248for my $f ( @connection_state_fields ) { 249 has $f => ( 250 is => 'rwp', 251 clearer => "_clear_$f", 252 init_arg => undef, 253 ); 254} 255 256around BUILDARGS => sub { 257 my $orig = shift; 258 my $class = shift; 259 my $hr = $class->$orig(@_); 260 261 # shortcut on missing required field 262 return $hr unless exists $hr->{address}; 263 264 ($hr->{host}, $hr->{port}) = split /:/, $hr->{address}; 265 266 return $hr; 267}; 268 269sub connect { 270 @_ == 1 || MongoDB::UsageError->throw( q/Usage: $handle->connect()/ . "\n" ); 271 my ($self) = @_; 272 273 if ( $self->with_ssl ) { 274 $self->_assert_ssl; 275 # XXX possibly make SOCKET_CLASS an instance variable and set it here to IO::Socket::SSL 276 } 277 278 my ($host, $port) = split /:/, $self->address; 279 280 # PERL-715: For 'localhost' where MongoDB is only listening on IPv4 and 281 # getaddrinfo returns an IPv6 address before an IPv4 address, some 282 # operating systems tickle a bug in IO::Socket::IP that causes 283 # connection attempts to fail before trying the IPv4 address. As a 284 # workaround, we always force 'localhost' to use IPv4. 285 286 my $fh = $SOCKET_CLASS->new( 287 PeerHost => $ENV{TEST_MONGO_SOCKET_HOST} || $host, 288 PeerPort => $port, 289 ( lc($host) eq 'localhost' ? ( Family => AF_INET ) : () ), 290 Proto => 'tcp', 291 Type => SOCK_STREAM, 292 Timeout => $self->connect_timeout >= 0 ? $self->connect_timeout : undef, 293 ) 294 or 295 MongoDB::NetworkError->throw(qq/Could not connect to '@{[$self->address]}': $@\n/); 296 297 unless ( binmode($fh) ) { 298 undef $fh; 299 MongoDB::InternalError->throw(qq/Could not binmode() socket: '$!'\n/); 300 } 301 302 unless ( defined( $fh->setsockopt( IPPROTO_TCP, TCP_NODELAY, 1 ) ) ) { 303 undef $fh; 304 MongoDB::InternalError->throw(qq/Could not set TCP_NODELAY on socket: '$!'\n/); 305 } 306 307 unless ( defined( $fh->setsockopt( SOL_SOCKET, SO_KEEPALIVE, 1 ) ) ) { 308 undef $fh; 309 MongoDB::InternalError->throw(qq/Could not set SO_KEEPALIVE on socket: '$!'\n/); 310 } 311 312 $self->_set_fh($fh); 313 $self->_set_connected(1); 314 315 my $fd = fileno $fh; 316 unless ( defined $fd && $fd >= 0 ) { 317 $self->_close; 318 MongoDB::InternalError->throw(qq/select(2): 'Bad file descriptor'\n/); 319 } 320 vec( my $fdset = '', $fd, 1 ) = 1; 321 $self->_set_fdset( $fdset ); 322 323 $self->start_ssl($host) if $self->with_ssl; 324 325 $self->_set_last_used( time ); 326 $self->_set_rcvbuf( $fh->sockopt(SO_RCVBUF) ); 327 328 # Default max msg size is 2 * max BSON object size (DRIVERS-1) 329 $self->_set_max_message_size_bytes( 2 * MAX_BSON_OBJECT_SIZE ); 330 331 return $self; 332} 333 334sub set_metadata { 335 my ( $self, $server ) = @_; 336 $self->_set_server($server); 337 $self->_set_min_wire_version( $server->is_master->{minWireVersion} || "0" ); 338 $self->_set_max_wire_version( $server->is_master->{maxWireVersion} || "0" ); 339 $self->_set_max_bson_object_size( $server->is_master->{maxBsonObjectSize} 340 || MAX_BSON_OBJECT_SIZE ); 341 $self->_set_max_write_batch_size( $server->is_master->{maxWriteBatchSize} 342 || MAX_WRITE_BATCH_SIZE ); 343 344 # Default is 2 * max BSON object size (DRIVERS-1) 345 $self->_set_max_message_size_bytes( $server->is_master->{maxMessageSizeBytes} 346 || 2 * $self->max_bson_object_size ); 347 348 if ( $self->accepts_wire_version(2) ) { 349 $self->_set_supports_write_commands(1); 350 } 351 if ( $self->accepts_wire_version(3) ) { 352 $self->_set_supports_list_commands(1); 353 $self->_set_supports_scram_sha1(1); 354 } 355 if ( $self->accepts_wire_version(4) ) { 356 $self->_set_supports_document_validation(1); 357 $self->_set_supports_explain_command(1); 358 $self->_set_supports_query_commands(1); 359 $self->_set_supports_find_modify_write_concern(1); 360 $self->_set_supports_fsync_command(1); 361 $self->_set_supports_read_concern(1); 362 } 363 if ( $self->accepts_wire_version(5) ) { 364 $self->_set_supports_collation(1); 365 $self->_set_supports_helper_write_concern(1); 366 $self->_set_supports_x509_user_from_cert(1); 367 } 368 if ( $self->accepts_wire_version(6) ) { 369 $self->_set_supports_arrayFilters(1); 370 $self->_set_supports_clusterTime(1); 371 $self->_set_supports_db_aggregation(1); 372 $self->_set_supports_retryWrites( 373 defined( $server->logical_session_timeout_minutes ) 374 && ( $server->type ne 'Standalone' ) 375 ? 1 376 : 0 377 ); 378 $self->_set_supports_op_msg(1); 379 $self->_set_supports_retryReads(1); 380 } 381 if ( $self->accepts_wire_version(7) ) { 382 $self->_set_supports_4_0_changestreams(1); 383 } 384 if ( $self->accepts_wire_version(8) ) { 385 $self->_set_supports_aggregate_out_read_concern(1); 386 } 387 388 return; 389} 390 391sub accepts_wire_version { 392 my ( $self, $version ) = @_; 393 my $min = $self->min_wire_version || 0; 394 my $max = $self->max_wire_version || 0; 395 return $version >= $min && $version <= $max; 396} 397 398sub start_ssl { 399 my ( $self, $host ) = @_; 400 401 my $ssl_args = $self->_ssl_args($host); 402 IO::Socket::SSL->start_SSL( 403 $self->fh, 404 %$ssl_args, 405 SSL_create_ctx_callback => sub { 406 my $ctx = shift; 407 Net::SSLeay::CTX_set_mode( $ctx, Net::SSLeay::MODE_AUTO_RETRY() ); 408 }, 409 ); 410 411 unless ( ref( $self->fh ) eq 'IO::Socket::SSL' ) { 412 my $ssl_err = IO::Socket::SSL->errstr; 413 $self->_close; 414 MongoDB::HandshakeError->throw(qq/SSL connection failed for $host: $ssl_err\n/); 415 } 416} 417 418sub client_certificate_subject { 419 my ($self) = @_; 420 return "" unless $self->fh && $self->fh->isa("IO::Socket::SSL"); 421 422 my $client_cert = $self->fh->sock_certificate() 423 or return ""; 424 425 my $subject_raw = Net::SSLeay::X509_get_subject_name($client_cert) 426 or return ""; 427 428 my $subject = 429 Net::SSLeay::X509_NAME_print_ex( $subject_raw, Net::SSLeay::XN_FLAG_RFC2253() ); 430 431 return $subject; 432} 433 434sub close { 435 my ($self) = @_; 436 $self->_close 437 or MongoDB::NetworkError->throw(qq/Error closing socket: '$!'\n/); 438} 439 440# this is a quiet close so preexisting network errors can be thrown 441sub _close { 442 my ($self) = @_; 443 $self->_clear_connected; 444 my $ok = 1; 445 if ( $self->fh ) { 446 $ok = CORE::close( $self->fh ); 447 $self->_clear_fh; 448 } 449 return $ok; 450} 451 452sub is_connected { 453 my ($self) = @_; 454 return $self->connected && $self->fh; 455} 456 457sub write { 458 my ( $self, $buf, $write_opt ) = @_; 459 $write_opt ||= {}; 460 461 if ( 462 !$write_opt->{disable_compression} 463 && $self->server 464 && $self->server->compressor 465 ) { 466 $buf = MongoDB::_Protocol::compress( 467 $buf, 468 $self->server->compressor, 469 ); 470 } 471 472 my ( $len, $off, $pending, $nfound, $r ) = ( length($buf), 0 ); 473 474 MongoDB::ProtocolError->throw( 475 qq/Message of size $len exceeds maximum of / . $self->{max_message_size_bytes} ) 476 if $len > $self->max_message_size_bytes; 477 478 local $SIG{PIPE} = 'IGNORE'; 479 480 while () { 481 482 # do timeout 483 ( $pending, $nfound ) = ( $self->socket_timeout, 0 ); 484 TIMEOUT: while () { 485 if ( -1 == ( $nfound = select( undef, $self->fdset, undef, $pending ) ) ) { 486 unless ( $! == EINTR ) { 487 $self->_close; 488 MongoDB::NetworkError->throw(qq/select(2): '$!'\n/); 489 } 490 # to avoid overhead tracking monotonic clock times; assume 491 # interrupts occur on average halfway through the timeout period 492 # and restart with half the original time 493 $pending = int( $pending / 2 ); 494 redo TIMEOUT; 495 } 496 last TIMEOUT; 497 } 498 unless ($nfound) { 499 $self->_close; 500 MongoDB::NetworkTimeout->throw( 501 qq/Timed out while waiting for socket to become ready for writing\n/); 502 } 503 504 # do write 505 if ( defined( $r = syswrite( $self->fh, $buf, $len, $off ) ) ) { 506 ( $len -= $r ), ( $off += $r ); 507 last unless $len > 0; 508 } 509 elsif ( $! == EPIPE ) { 510 $self->_close; 511 MongoDB::NetworkError->throw(qq/Socket closed by remote server: $!\n/); 512 } 513 elsif ( $! != EINTR ) { 514 if ( $self->fh->can('errstr') ) { 515 my $err = $self->fh->errstr(); 516 $self->_close; 517 MongoDB::NetworkError->throw(qq/Could not write to SSL socket: '$err'\n /); 518 } 519 else { 520 $self->_close; 521 MongoDB::NetworkError->throw(qq/Could not write to socket: '$!'\n/); 522 } 523 524 } 525 } 526 527 $self->_set_last_used(time); 528 529 return; 530} 531 532sub read { 533 my ($self) = @_; 534 535 # len of undef triggers first pass through loop 536 my ( $msg, $len, $pending, $nfound, $r ) = ( '', undef ); 537 538 while () { 539 540 # do timeout 541 ( $pending, $nfound ) = ( $self->socket_timeout, 0 ); 542 TIMEOUT: while () { 543 # no need to select if SSL and has pending data from a frame 544 if ( $self->with_ssl ) { 545 ( $nfound = 1 ), last TIMEOUT 546 if $self->fh->pending; 547 } 548 549 if ( -1 == ( $nfound = select( $self->fdset, undef, undef, $pending ) ) ) { 550 unless ( $! == EINTR ) { 551 $self->_close; 552 MongoDB::NetworkError->throw(qq/select(2): '$!'\n/); 553 } 554 # to avoid overhead tracking monotonic clock times; assume 555 # interrupts occur on average halfway through the timeout period 556 # and restart with half the original time 557 $pending = int( $pending / 2 ); 558 redo TIMEOUT; 559 } 560 last TIMEOUT; 561 } 562 unless ($nfound) { 563 $self->_close; 564 MongoDB::NetworkTimeout->throw( 565 q/Timed out while waiting for socket to become ready for reading/ . "\n" ); 566 } 567 568 # read up to SO_RCVBUF if we can 569 if ( defined( $r = sysread( $self->fh, $msg, $self->rcvbuf, length $msg ) ) ) { 570 # because select said we're ready to read, if we read 0 then 571 # we got EOF before the full message 572 if ( !$r ) { 573 $self->_close; 574 MongoDB::NetworkError->throw(qq/Unexpected end of stream\n/); 575 } 576 } 577 elsif ( $! != EINTR ) { 578 if ( $self->fh->can('errstr') ) { 579 my $err = $self->fh->errstr(); 580 $self->_close; 581 MongoDB::NetworkError->throw(qq/Could not read from SSL socket: '$err'\n /); 582 } 583 else { 584 $self->_close; 585 MongoDB::NetworkError->throw(qq/Could not read from socket: '$!'\n/); 586 } 587 } 588 589 if ( !defined $len ) { 590 next if length($msg) < 4; 591 $len = unpack( P_INT32, $msg ); 592 MongoDB::ProtocolError->throw( 593 qq/Server reply of size $len exceeds maximum of / . $self->{max_message_size_bytes} ) 594 if $len > $self->max_message_size_bytes; 595 } 596 last unless length($msg) < $len; 597 } 598 599 $self->_set_last_used(time); 600 601 return $msg; 602} 603 604sub _assert_ssl { 605 # Need IO::Socket::SSL 1.42 for SSL_create_ctx_callback 606 MongoDB::UsageError->throw(qq/IO::Socket::SSL 1.42 must be installed for SSL support\n/) 607 unless eval { require IO::Socket::SSL; IO::Socket::SSL->VERSION(1.42) }; 608 # Need Net::SSLeay 1.49 for MODE_AUTO_RETRY 609 MongoDB::UsageError->throw(qq/Net::SSLeay 1.49 must be installed for SSL support\n/) 610 unless eval { require Net::SSLeay; Net::SSLeay->VERSION(1.49) }; 611} 612 613# Try to find a CA bundle to validate the SSL cert, 614# prefer Mozilla::CA or fallback to a system file 615sub _find_CA_file { 616 my $self = shift(); 617 618 return $self->SSL_options->{SSL_ca_file} 619 if $self->SSL_options->{SSL_ca_file} and -e $self->SSL_options->{SSL_ca_file}; 620 621 return Mozilla::CA::SSL_ca_file() 622 if eval { require Mozilla::CA }; 623 624 # cert list copied from golang src/crypto/x509/root_unix.go 625 foreach my $ca_bundle ( 626 "/etc/ssl/certs/ca-certificates.crt", # Debian/Ubuntu/Gentoo etc. 627 "/etc/pki/tls/certs/ca-bundle.crt", # Fedora/RHEL 628 "/etc/ssl/ca-bundle.pem", # OpenSUSE 629 "/etc/openssl/certs/ca-certificates.crt", # NetBSD 630 "/etc/ssl/cert.pem", # OpenBSD 631 "/usr/local/share/certs/ca-root-nss.crt", # FreeBSD/DragonFly 632 "/etc/pki/tls/cacert.pem", # OpenELEC 633 "/etc/certs/ca-certificates.crt", # Solaris 11.2+ 634 ) { 635 return $ca_bundle if -e $ca_bundle; 636 } 637 638 MongoDB::UsageError->throw( 639 qq/Couldn't find a CA bundle with which to verify the SSL certificate.\n/ 640 . qq/Try installing Mozilla::CA from CPAN\n/); 641} 642 643sub _ssl_args { 644 my ( $self, $host ) = @_; 645 646 my %ssl_args; 647 648 # This test reimplements IO::Socket::SSL::can_client_sni(), which wasn't 649 # added until IO::Socket::SSL 1.84 650 if ( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x10000000 ) { 651 $ssl_args{SSL_hostname} = $host, # Sane SNI support 652 } 653 654 if ( Net::SSLeay::OPENSSL_VERSION_NUMBER() >= 0x10100000 ) { 655 $ssl_args{SSL_OP_NO_RENEGOTIATION} = Net::SSLeay::OP_NO_RENEGOTIATION(); 656 } 657 658 $ssl_args{SSL_verifycn_scheme} = 'http'; # enable CN validation 659 $ssl_args{SSL_verifycn_name} = $host; # set validation hostname 660 $ssl_args{SSL_verify_mode} = 0x01; # enable cert validation 661 $ssl_args{SSL_ca_file} = $self->_find_CA_file; 662 663 # user options override default settings 664 for my $k ( keys %{ $self->SSL_options } ) { 665 $ssl_args{$k} = $self->SSL_options->{$k} if $k =~ m/^SSL_/; 666 } 667 668 return \%ssl_args; 669} 670 6711; 672 673# vim: ts=4 sts=4 sw=4 et: 674