1# ----------------------------------------------------------------------------- 2# $Id: Resolver.pm 35566 2009-10-09 14:53:58Z topia $ 3# ----------------------------------------------------------------------------- 4# Simple Resolver with multi-thread or blocking. 5# ----------------------------------------------------------------------------- 6# copyright (C) 2004 Topia <topia@clovery.jp>. all rights reserved. 7package Tiarra::Resolver::QueueData; 8use strict; 9use warnings; 10use Tiarra::DefineEnumMixin (qw(ID TIMEOUT), 11 qw(QUERY_TYPE QUERY_DATA ANSWER_STATUS ANSWER_DATA)); 12use Tiarra::DefineEnumMixin (qw(ANSWER_OK ANSWER_NOT_FOUND ANSWER_TIMEOUT), 13 qw(ANSWER_NOT_SUPPORTED ANSWER_INTERNAL_ERROR)); 14use Tiarra::Utils; 15Tiarra::Utils->define_array_attr_accessor( 16 0, qw(id timeout query_type query_data answer_status answer_data)); 17 18# attributes: 19# timeout: not implemented yet 20# query_type: resolver dependant data 21# query_data: resolver dependant data 22# answer_status: status 23# * ANSWER_OK: resolved 24# * ANSWER_NOT_FOUND: not found 25# * ANSWER_TIMEOUT: timeout (not implemented yet) 26# * ANSWER_NOT_SUPPORTED: not supported type (data: message) 27# * ANSWER_INTERNAL_ERROR: internal error occurred (data: message) 28# answer_data: data 29 30sub new { 31 my $class = shift; 32 33 # FIXME: check type/value! 34 my $this = [@_]; 35 bless $this, $class; 36 $this; 37} 38 39sub serialize { 40 my $this = shift; 41 42 my @array : shared = @$this; 43 \@array; 44} 45 46sub parse { 47 my $this = shift; 48 49 ref($this)->new(@{+shift}); 50} 51 52package Tiarra::Resolver; 53use strict; 54use warnings; 55use Tiarra::OptionalModules; 56use Tiarra::SharedMixin; 57use Tiarra::WrapMainLoop; 58use Tiarra::TerminateManager; 59use Socket; 60use Carp; 61use Net::hostent; 62use Tiarra::DefineEnumMixin qw(QUERY_HOST QUERY_ADDR QUERY_SADDR QUERY_NAMEINFO); 63my $dataclass = 'Tiarra::Resolver::QueueData'; 64our $use_threads; 65our $use_ipv6; 66our $use_threads_state_checking; 67BEGIN { 68 $use_threads = !$^C && Tiarra::OptionalModules->threads; 69 if ($use_threads) { 70 require threads; 71 require threads::shared; 72 require Thread::Queue; 73 ## threads 1.33 or earlier, not support $thr->is_running() 74 $use_threads_state_checking = threads->can('is_running'); 75 } 76 77 $use_ipv6 = Tiarra::OptionalModules->ipv6; 78 if ($use_ipv6) { 79 eval 'use Socket6;'; 80 } else { 81 # dummy 82 *AI_NUMERICHOST = sub () { undef }; 83 *NI_NUMERICHOST = sub () { undef }; 84 *NI_NAMEREQD = sub () { undef }; 85 } 86} 87 88if ($use_threads) { 89 # fast initialize(for minimal thread) 90 __PACKAGE__->shared; 91} 92 93sub init { 94 my $this = shift; 95 96 if ($use_threads) { 97 $this->{ask_queue} = Thread::Queue->new; 98 $this->{reply_queue} = Thread::Queue->new; 99 $this->_create_thread(); 100 $this->{main_timer} = Tiarra::WrapMainLoop->new( 101 type => 'timer', 102 interval => 1, 103 closure => sub { 104 $this->mainloop; 105 }); 106 $this->{main_loop} = Tiarra::WrapMainLoop->new( 107 type => 'mainloop', 108 closure => sub { 109 $this->mainloop; 110 }); 111 $this->{destructor} = Tiarra::TerminateManager::Hook->new( 112 sub { 113 $this->destruct; 114 })->install; 115 } 116 $this->{id} = 0; 117 $this->{closures} = {}; 118 $this; 119} 120 121sub _new { 122 my $class = shift; 123 124 my $this = {}; 125 bless $this, $class; 126 $this->init; 127} 128 129sub _check_thread { 130 my $this = shift; 131 132 if (!$use_threads_state_checking || 133 (defined $this->{thread} && 134 $this->{thread}->is_running())) { 135 return undef; 136 } 137 $this->_create_thread; 138} 139 140sub _create_thread { 141 my $this = shift; 142 143 $this->{thread} = threads->create("resolver_thread", 144 ref($this), 145 $this->{ask_queue}, 146 $this->{reply_queue}); 147} 148 149sub destruct { 150 my $this = shift; 151 my $before_fork = shift; 152 153 if (!$before_fork && defined $this->{destructor}) { 154 $this->{destructor}->uninstall; 155 $this->{destructor} = undef; 156 } 157 $this->{ask_queue}->enqueue(undef); 158 $this->{thread}->join; 159 $this->mainloop; 160} 161 162sub resolve { 163 my ($class_or_this, $type, $data, $closure, $my_use_threads) = @_; 164 my $this = $class_or_this->_this; 165 166 $my_use_threads = $use_threads unless defined $my_use_threads; 167 croak 'data not defined; please specify this' unless defined $data; 168 croak 'closure not defined; please specify this' unless defined $closure; 169 my $entry = $dataclass->new; 170 my $do = undef; 171 if ($type eq 'addr') { 172 # addr: forward lookup 173 # query data: hostname 174 # callback data: [addr1, addr2, ...] 175 $entry->query_type(QUERY_ADDR); 176 $entry->query_data($data); 177 $do = 1; 178 } elsif ($type eq 'host') { 179 # host: reverse lookup 180 # query data: ip address 181 # callback data: hostname or [host1, host2, ...] 182 $entry->query_type(QUERY_HOST); 183 $entry->query_data($data); 184 $do = 1; 185 } elsif ($type eq 'saddr') { 186 # saddr: get socket addr 187 # query data: [host, port] 188 # callback data: sockaddr struct 189 $entry->query_type(QUERY_SADDR); 190 $entry->query_data($data); 191 $do = 1; 192 } elsif ($type eq 'nameinfo') { 193 # nameinfo: get address/port from socket addr 194 # query data: sockaddr struct 195 # callback data: [address, port] 196 $entry->query_type(QUERY_NAMEINFO); 197 $entry->query_data($data); 198 $do = 1; 199 $my_use_threads = 0; # thread is not required 200 } 201 local $use_threads = $my_use_threads; 202 if ($do) { 203 $entry->timeout(0); 204 $entry->id($this->{id}++); 205 $this->{closures}->{$entry->id} = $closure; 206 if ($use_threads) { 207 $this->_check_thread(); 208 $this->{ask_queue}->enqueue($entry->serialize); 209 $this->{main_timer}->lazy_install; 210 $this->{main_loop}->lazy_install; 211 undef; 212 } else { 213 $this->_call($this->_resolve($entry)); 214 } 215 } else { 216 $entry->answer_status($entry->ANSWER_NOT_SUPPORTED); 217 $entry->answer_data("typename '$type' not supported"); 218 $closure->($entry); 219 } 220} 221 222sub paranoid_check { 223 # ip -> host -> ip check 224 # closure: sub { 225 # my ($status, $hostname, $final_result) = @_; 226 # if (!$status) { die "paranoid check failed!"; } 227 # warn "paranoid check successful with: $hostname"; 228 # if (defined $final_result) { /* maybe unnecessary */ } 229 # } 230 my ($class_or_this, $data, $closure, $my_use_threads) = @_; 231 my $this = $class_or_this->_this; 232 233 # stage 1 234 $this->resolve( 235 'host', $data, sub { 236 eval { 237 $this->_paranoid_stage1($data, $closure, $my_use_threads, shift); 238 }; if ($@) { 239 $closure->(0, undef); 240 } 241 }, $my_use_threads); 242} 243 244sub _paranoid_stage1 { 245 my ($this, $data, $closure, $my_use_threads, $entry) = @_; 246 247 if ($entry->answer_status eq $entry->ANSWER_OK) { 248 my $host = $entry->answer_data; 249 if (ref($host) eq 'ARRAY') { 250 # FIXME: support multiple hostname resolved 251 $host = $host->[0]; 252 } 253 $this->resolve( 254 'addr', $host, sub { 255 eval { 256 $this->_paranoid_stage2($data, $closure, $my_use_threads, shift); 257 }; if ($@) { 258 $closure->(0, undef, $entry); 259 } 260 }, $my_use_threads); 261 } else { 262 $closure->(0, undef, $entry); 263 } 264} 265 266sub _paranoid_stage2 { 267 my ($this, $data, $closure, $my_use_threads, $entry) = @_; 268 269 if ($entry->answer_status eq $entry->ANSWER_OK) { 270 if (grep { $data eq $_ } @{$entry->answer_data}) { 271 $closure->(1, $entry->query_data, $entry); 272 } 273 } else { 274 $closure->(0, undef, $entry); 275 } 276} 277 278sub _call { 279 my ($this, $entry) = @_; 280 281 my $id = $entry->id; 282 eval { $this->{closures}->{$id}->($entry); }; 283 if ($@) { ::printmsg($@); } 284 delete $this->{closures}->{$id}; 285 if (!%{$this->{closures}} && $use_threads) { 286 $this->{main_timer}->lazy_uninstall; 287 $this->{main_loop}->lazy_uninstall; 288 } 289 $entry; 290} 291 292sub _resolve { 293 my ($class_or_this, $entry) = @_; 294 295 my $resolved = undef; 296 my $ret = undef; 297 298 if ($entry->query_type eq QUERY_ADDR) { 299 my @addrs; 300 threads::shared::share(@addrs) if $use_threads; 301 302 if ( $^O =~ /^MSWin32/ && $entry->query_data eq 'localhost' ) { 303 # Win2kだとなぜか問い合わせに失敗するので固定応答. 304 @addrs = ('127.0.0.1'); 305 if ($use_ipv6) { 306 push(@addrs, '::1'); 307 } 308 $resolved = 1; 309 } 310 if ($use_ipv6 && !$resolved) { 311 my @res = getaddrinfo($entry->query_data, 0, AF_UNSPEC, SOCK_STREAM); 312 my ($saddr, $addr, %addrs); 313 while (scalar(@res) >= 5) { 314 # check proto,... etc 315 (undef, undef, undef, $saddr, undef, @res) = @res; 316 ($addr, undef) = getnameinfo($saddr, NI_NUMERICHOST); 317 if (defined $addr && !$addrs{$addr}) { 318 $addrs{$addr} = 1; 319 push(@addrs, $addr); 320 } 321 } 322 if (@addrs) { 323 $resolved = 1; 324 } 325 } 326 if (!$resolved) { 327 my $hostent = Net::hostent::gethost($entry->query_data); 328 if (defined $hostent) { 329 #$entry->answer_data($hostent->addr_list); 330 @addrs = map { 331 inet_ntoa($_); 332 } @{$hostent->addr_list}; 333 $resolved = 1; 334 } 335 } 336 337 if ($resolved) { 338 $entry->answer_data(\@addrs); 339 } 340 } elsif ($entry->query_type eq QUERY_HOST) { 341 my @hosts; 342 threads::shared::share(@hosts) if $use_threads; 343 344 if ( $^O =~ /^MSWin32/ && $entry->query_data eq '127.0.0.1' ) { 345 # Win2kだとなぜか問い合わせに失敗するので固定応答. 346 @hosts = ('localhost'); 347 $resolved = 1; 348 } 349 if ($use_ipv6 && !$resolved) { 350 my @res = getaddrinfo($entry->query_data, 0, AF_UNSPEC, SOCK_STREAM); 351 my ($saddr, $host, %hosts); 352 while (scalar(@res) >= 5) { 353 # check proto,... etc 354 (undef, undef, undef, $saddr, undef, @res) = @res; 355 ($host, undef) = getnameinfo($saddr, NI_NAMEREQD); 356 if (defined $host && !$hosts{$host}) { 357 $hosts{$host} = 1; 358 push(@hosts, $host); 359 } 360 } 361 if (@hosts) { 362 $resolved = 1; 363 } 364 } 365 if (!$resolved) { 366 my $hostent = Net::hostent::gethost($entry->query_data); 367 if (defined $hostent) { 368 @hosts = ($hostent->name); 369 $resolved = 1; 370 } 371 } 372 373 if ($resolved) { 374 $entry->answer_data(@hosts == 1 ? $hosts[0] : \@hosts); 375 } 376 } elsif ($entry->query_type eq QUERY_SADDR) { 377 if ($use_ipv6 && !$resolved) { 378 my @res = getaddrinfo($entry->query_data->[0], 379 $entry->query_data->[1], 380 AF_UNSPEC, SOCK_STREAM); 381 my ($saddr); 382 (undef, undef, undef, $saddr, undef, @res) = @res; 383 if (defined $saddr) { 384 $entry->answer_data($saddr); 385 $resolved = 1; 386 } 387 } 388 if (!$resolved) { 389 my $addr = inet_aton($entry->query_data->[0]); 390 if (defined $addr) { 391 $entry->answer_data(pack_sockaddr_in($entry->query_data->[1], 392 $addr)); 393 $resolved = 1; 394 } 395 } 396 } elsif ($entry->query_type eq QUERY_NAMEINFO) { 397 my ($addr, $port); 398 if ($use_ipv6 && !$resolved) { 399 ($addr, $port) = getnameinfo($entry->query_data, NI_NUMERICHOST); 400 $resolved = 1; 401 } 402 if (!$resolved) { 403 ($port, $addr) = sockaddr_in($entry->query_data); 404 $resolved = 1; 405 } 406 if ($resolved) { 407 my @data; 408 threads::shared::share(@data) if $use_threads; 409 @data = ($addr, $port); 410 $entry->answer_data(\@data); 411 } 412 } else { 413 carp 'unsupported query type('.$entry->query_type.')'; 414 $entry->answer_status($entry->ANSWER_NOT_SUPPORTED); 415 $entry->answer_data('unsupported query type('.$entry->query_type.')'); 416 } 417 418 if ($resolved) { 419 $entry->answer_status($entry->ANSWER_OK); 420 } else { 421 $entry->answer_status($entry->ANSWER_NOT_FOUND); 422 } 423 return $entry; 424} 425 426sub resolver_thread { 427 my ($class, $ask_queue, $reply_queue) = @_; 428 429 my ($data, $entry); 430 while (defined ($data = $ask_queue->dequeue)) { 431 $entry = $dataclass->new->parse($data); 432 eval { 433 $reply_queue->enqueue($class->_resolve($entry)->serialize); 434 }; if ($@) { 435 my $err = $@; 436 $entry->answer_status($entry->ANSWER_INTERNAL_ERROR); 437 eval { 438 require Data::Dumper; 439 my $answer_data = $entry->answer_data; 440 if (defined $answer_data) { 441 $err .= "(answer_data: " . 442 Data::Dumper->new([$entry->answer_data])->Terse(1)-> 443 Purity(1)->Dump . ")\n"; 444 } 445 }; 446 $entry->answer_data($err); 447 $reply_queue->enqueue($entry->serialize); 448 } 449 } 450 return 0; 451} 452 453sub mainloop { 454 my $this = shift; 455 456 my $entry; 457 while ($this->{reply_queue}->pending) { 458 $entry = $this->{reply_queue}->dequeue; 459 $this->_call($dataclass->new->parse($entry)); 460 } 461} 462 4631; 464