1# -----------------------------------------------------------------------------
2# $Id: Resolver.pm 35566 2009-10-09 14:53:58Z topia $
3# -----------------------------------------------------------------------------
4# Simple Resolver with multi-thread or blocking.
5# -----------------------------------------------------------------------------
6# copyright (C) 2004 Topia <topia@clovery.jp>. all rights reserved.
7package Tiarra::Resolver::QueueData;
8use strict;
9use warnings;
10use Tiarra::DefineEnumMixin (qw(ID TIMEOUT),
11			     qw(QUERY_TYPE QUERY_DATA ANSWER_STATUS ANSWER_DATA));
12use Tiarra::DefineEnumMixin (qw(ANSWER_OK ANSWER_NOT_FOUND ANSWER_TIMEOUT),
13			     qw(ANSWER_NOT_SUPPORTED ANSWER_INTERNAL_ERROR));
14use Tiarra::Utils;
15Tiarra::Utils->define_array_attr_accessor(
16    0, qw(id timeout query_type query_data answer_status answer_data));
17
18# attributes:
19#   timeout: not implemented yet
20#   query_type: resolver dependant data
21#   query_data: resolver dependant data
22#   answer_status: status
23#     * ANSWER_OK: resolved
24#     * ANSWER_NOT_FOUND: not found
25#     * ANSWER_TIMEOUT: timeout (not implemented yet)
26#     * ANSWER_NOT_SUPPORTED: not supported type (data: message)
27#     * ANSWER_INTERNAL_ERROR: internal error occurred (data: message)
28#   answer_data: data
29
30sub new {
31    my $class = shift;
32
33    # FIXME: check type/value!
34    my $this = [@_];
35    bless $this, $class;
36    $this;
37}
38
39sub serialize {
40    my $this = shift;
41
42    my @array : shared = @$this;
43    \@array;
44}
45
46sub parse {
47    my $this = shift;
48
49    ref($this)->new(@{+shift});
50}
51
52package Tiarra::Resolver;
53use strict;
54use warnings;
55use Tiarra::OptionalModules;
56use Tiarra::SharedMixin;
57use Tiarra::WrapMainLoop;
58use Tiarra::TerminateManager;
59use Socket;
60use Carp;
61use Net::hostent;
62use Tiarra::DefineEnumMixin qw(QUERY_HOST QUERY_ADDR QUERY_SADDR QUERY_NAMEINFO);
63my $dataclass = 'Tiarra::Resolver::QueueData';
64our $use_threads;
65our $use_ipv6;
66our $use_threads_state_checking;
67BEGIN {
68    $use_threads = !$^C && Tiarra::OptionalModules->threads;
69    if ($use_threads) {
70	require threads;
71	require threads::shared;
72	require Thread::Queue;
73	## threads 1.33 or earlier, not support $thr->is_running()
74	$use_threads_state_checking = threads->can('is_running');
75    }
76
77    $use_ipv6 = Tiarra::OptionalModules->ipv6;
78    if ($use_ipv6) {
79	eval 'use Socket6;';
80    } else {
81	# dummy
82	*AI_NUMERICHOST = sub () { undef };
83	*NI_NUMERICHOST = sub () { undef };
84	*NI_NAMEREQD = sub () { undef };
85    }
86}
87
88if ($use_threads) {
89    # fast initialize(for minimal thread)
90    __PACKAGE__->shared;
91}
92
93sub init {
94    my $this = shift;
95
96    if ($use_threads) {
97	$this->{ask_queue} = Thread::Queue->new;
98	$this->{reply_queue} = Thread::Queue->new;
99	$this->_create_thread();
100	$this->{main_timer} = Tiarra::WrapMainLoop->new(
101	    type => 'timer',
102	    interval => 1,
103	    closure => sub {
104		$this->mainloop;
105	    });
106	$this->{main_loop} = Tiarra::WrapMainLoop->new(
107	    type => 'mainloop',
108	    closure => sub {
109		$this->mainloop;
110	    });
111	$this->{destructor} = Tiarra::TerminateManager::Hook->new(
112	    sub {
113		$this->destruct;
114	    })->install;
115    }
116    $this->{id} = 0;
117    $this->{closures} = {};
118    $this;
119}
120
121sub _new {
122    my $class = shift;
123
124    my $this = {};
125    bless $this, $class;
126    $this->init;
127}
128
129sub _check_thread {
130    my $this = shift;
131
132    if (!$use_threads_state_checking ||
133	    (defined $this->{thread} &&
134		 $this->{thread}->is_running())) {
135	return undef;
136    }
137    $this->_create_thread;
138}
139
140sub _create_thread {
141    my $this = shift;
142
143    $this->{thread} = threads->create("resolver_thread",
144				      ref($this),
145				      $this->{ask_queue},
146				      $this->{reply_queue});
147}
148
149sub destruct {
150    my $this = shift;
151    my $before_fork = shift;
152
153    if (!$before_fork && defined $this->{destructor}) {
154	$this->{destructor}->uninstall;
155	$this->{destructor} = undef;
156    }
157    $this->{ask_queue}->enqueue(undef);
158    $this->{thread}->join;
159    $this->mainloop;
160}
161
162sub resolve {
163    my ($class_or_this, $type, $data, $closure, $my_use_threads) = @_;
164    my $this = $class_or_this->_this;
165
166    $my_use_threads = $use_threads unless defined $my_use_threads;
167    croak 'data not defined; please specify this' unless defined $data;
168    croak 'closure not defined; please specify this' unless defined $closure;
169    my $entry = $dataclass->new;
170    my $do = undef;
171    if ($type eq 'addr') {
172	# addr: forward lookup
173	#   query data: hostname
174	#   callback data: [addr1, addr2, ...]
175	$entry->query_type(QUERY_ADDR);
176	$entry->query_data($data);
177	$do = 1;
178    } elsif ($type eq 'host') {
179	# host: reverse lookup
180	#   query data: ip address
181	#   callback data: hostname or [host1, host2, ...]
182	$entry->query_type(QUERY_HOST);
183	$entry->query_data($data);
184	$do = 1;
185    } elsif ($type eq 'saddr') {
186	# saddr: get socket addr
187	#   query data: [host, port]
188	#   callback data: sockaddr struct
189	$entry->query_type(QUERY_SADDR);
190	$entry->query_data($data);
191	$do = 1;
192    } elsif ($type eq 'nameinfo') {
193	# nameinfo: get address/port from socket addr
194	#   query data: sockaddr struct
195	#   callback data: [address, port]
196	$entry->query_type(QUERY_NAMEINFO);
197	$entry->query_data($data);
198	$do = 1;
199	$my_use_threads = 0; # thread is not required
200    }
201    local $use_threads = $my_use_threads;
202    if ($do) {
203	$entry->timeout(0);
204	$entry->id($this->{id}++);
205	$this->{closures}->{$entry->id} = $closure;
206	if ($use_threads) {
207	    $this->_check_thread();
208	    $this->{ask_queue}->enqueue($entry->serialize);
209	    $this->{main_timer}->lazy_install;
210	    $this->{main_loop}->lazy_install;
211	    undef;
212	} else {
213	    $this->_call($this->_resolve($entry));
214	}
215    } else {
216	$entry->answer_status($entry->ANSWER_NOT_SUPPORTED);
217	$entry->answer_data("typename '$type' not supported");
218	$closure->($entry);
219    }
220}
221
222sub paranoid_check {
223    # ip -> host -> ip check
224    # closure: sub {
225    #              my ($status, $hostname, $final_result) = @_;
226    #              if (!$status) { die "paranoid check failed!"; }
227    #              warn "paranoid check successful with: $hostname";
228    #              if (defined $final_result) { /* maybe unnecessary */ }
229    #          }
230    my ($class_or_this, $data, $closure, $my_use_threads) = @_;
231    my $this = $class_or_this->_this;
232
233    # stage 1
234    $this->resolve(
235	'host', $data, sub {
236	    eval {
237		$this->_paranoid_stage1($data, $closure, $my_use_threads, shift);
238	    }; if ($@) {
239		$closure->(0, undef);
240	    }
241	}, $my_use_threads);
242}
243
244sub _paranoid_stage1 {
245    my ($this, $data, $closure, $my_use_threads, $entry) = @_;
246
247    if ($entry->answer_status eq $entry->ANSWER_OK) {
248	my $host = $entry->answer_data;
249	if (ref($host) eq 'ARRAY') {
250	    # FIXME: support multiple hostname resolved
251	    $host = $host->[0];
252	}
253	$this->resolve(
254	    'addr', $host, sub {
255		eval {
256		    $this->_paranoid_stage2($data, $closure, $my_use_threads, shift);
257		}; if ($@) {
258		    $closure->(0, undef, $entry);
259		}
260	    }, $my_use_threads);
261    } else {
262	$closure->(0, undef, $entry);
263    }
264}
265
266sub _paranoid_stage2 {
267    my ($this, $data, $closure, $my_use_threads, $entry) = @_;
268
269    if ($entry->answer_status eq $entry->ANSWER_OK) {
270	if (grep { $data eq $_ } @{$entry->answer_data}) {
271	    $closure->(1, $entry->query_data, $entry);
272	}
273    } else {
274	$closure->(0, undef, $entry);
275    }
276}
277
278sub _call {
279    my ($this, $entry) = @_;
280
281    my $id = $entry->id;
282    eval { $this->{closures}->{$id}->($entry); };
283    if ($@) { ::printmsg($@); }
284    delete $this->{closures}->{$id};
285    if (!%{$this->{closures}} && $use_threads) {
286	$this->{main_timer}->lazy_uninstall;
287	$this->{main_loop}->lazy_uninstall;
288    }
289    $entry;
290}
291
292sub _resolve {
293    my ($class_or_this, $entry) = @_;
294
295    my $resolved = undef;
296    my $ret = undef;
297
298    if ($entry->query_type eq QUERY_ADDR) {
299	my @addrs;
300	threads::shared::share(@addrs) if $use_threads;
301
302	if ( $^O =~ /^MSWin32/ && $entry->query_data eq 'localhost' ) {
303	    # Win2kだとなぜか問い合わせに失敗するので固定応答.
304	    @addrs = ('127.0.0.1');
305	    if ($use_ipv6) {
306		push(@addrs, '::1');
307	    }
308	    $resolved = 1;
309	}
310	if ($use_ipv6 && !$resolved) {
311	    my @res = getaddrinfo($entry->query_data, 0, AF_UNSPEC, SOCK_STREAM);
312	    my ($saddr, $addr, %addrs);
313	    while (scalar(@res) >= 5) {
314		# check proto,... etc
315		(undef, undef, undef, $saddr, undef, @res) = @res;
316		($addr, undef) = getnameinfo($saddr, NI_NUMERICHOST);
317		if (defined $addr && !$addrs{$addr}) {
318		    $addrs{$addr} = 1;
319		    push(@addrs, $addr);
320		}
321	    }
322	    if (@addrs) {
323		$resolved = 1;
324	    }
325	}
326	if (!$resolved) {
327	    my $hostent = Net::hostent::gethost($entry->query_data);
328	    if (defined $hostent) {
329		#$entry->answer_data($hostent->addr_list);
330		@addrs = map {
331		    inet_ntoa($_);
332		} @{$hostent->addr_list};
333		$resolved = 1;
334	    }
335	}
336
337	if ($resolved) {
338	    $entry->answer_data(\@addrs);
339	}
340    } elsif ($entry->query_type eq QUERY_HOST) {
341	my @hosts;
342	threads::shared::share(@hosts) if $use_threads;
343
344	if ( $^O =~ /^MSWin32/ && $entry->query_data eq '127.0.0.1' ) {
345		# Win2kだとなぜか問い合わせに失敗するので固定応答.
346		@hosts = ('localhost');
347		$resolved = 1;
348	}
349	if ($use_ipv6 && !$resolved) {
350	    my @res = getaddrinfo($entry->query_data, 0, AF_UNSPEC, SOCK_STREAM);
351	    my ($saddr, $host, %hosts);
352	    while (scalar(@res) >= 5) {
353		# check proto,... etc
354		(undef, undef, undef, $saddr, undef, @res) = @res;
355		($host, undef) = getnameinfo($saddr, NI_NAMEREQD);
356		if (defined $host && !$hosts{$host}) {
357		    $hosts{$host} = 1;
358		    push(@hosts, $host);
359		}
360	    }
361	    if (@hosts) {
362		$resolved = 1;
363	    }
364	}
365	if (!$resolved) {
366	    my $hostent = Net::hostent::gethost($entry->query_data);
367	    if (defined $hostent) {
368		@hosts = ($hostent->name);
369		$resolved = 1;
370	    }
371	}
372
373	if ($resolved) {
374	    $entry->answer_data(@hosts == 1 ? $hosts[0] : \@hosts);
375	}
376    } elsif ($entry->query_type eq QUERY_SADDR) {
377	if ($use_ipv6 && !$resolved) {
378	    my @res = getaddrinfo($entry->query_data->[0],
379				  $entry->query_data->[1],
380				  AF_UNSPEC, SOCK_STREAM);
381	    my ($saddr);
382	    (undef, undef, undef, $saddr, undef, @res) = @res;
383	    if (defined $saddr) {
384		$entry->answer_data($saddr);
385		$resolved = 1;
386	    }
387	}
388	if (!$resolved) {
389	    my $addr = inet_aton($entry->query_data->[0]);
390	    if (defined $addr) {
391		$entry->answer_data(pack_sockaddr_in($entry->query_data->[1],
392						     $addr));
393		$resolved = 1;
394	    }
395	}
396    } elsif ($entry->query_type eq QUERY_NAMEINFO) {
397	my ($addr, $port);
398	if ($use_ipv6 && !$resolved) {
399	    ($addr, $port) = getnameinfo($entry->query_data, NI_NUMERICHOST);
400	    $resolved = 1;
401	}
402	if (!$resolved) {
403	    ($port, $addr) = sockaddr_in($entry->query_data);
404	    $resolved = 1;
405	}
406	if ($resolved) {
407	    my @data;
408	    threads::shared::share(@data) if $use_threads;
409	    @data = ($addr, $port);
410	    $entry->answer_data(\@data);
411	}
412    } else {
413	carp 'unsupported query type('.$entry->query_type.')';
414	$entry->answer_status($entry->ANSWER_NOT_SUPPORTED);
415	$entry->answer_data('unsupported query type('.$entry->query_type.')');
416    }
417
418    if ($resolved) {
419	$entry->answer_status($entry->ANSWER_OK);
420    } else {
421	$entry->answer_status($entry->ANSWER_NOT_FOUND);
422    }
423    return $entry;
424}
425
426sub resolver_thread {
427    my ($class, $ask_queue, $reply_queue) = @_;
428
429    my ($data, $entry);
430    while (defined ($data = $ask_queue->dequeue)) {
431	$entry = $dataclass->new->parse($data);
432	eval {
433	    $reply_queue->enqueue($class->_resolve($entry)->serialize);
434	}; if ($@) {
435	    my $err = $@;
436	    $entry->answer_status($entry->ANSWER_INTERNAL_ERROR);
437	    eval {
438		require Data::Dumper;
439		my $answer_data = $entry->answer_data;
440		if (defined $answer_data) {
441		    $err .= "(answer_data: " .
442			Data::Dumper->new([$entry->answer_data])->Terse(1)->
443				Purity(1)->Dump . ")\n";
444		}
445	    };
446	    $entry->answer_data($err);
447	    $reply_queue->enqueue($entry->serialize);
448	}
449    }
450    return 0;
451}
452
453sub mainloop {
454    my $this = shift;
455
456    my $entry;
457    while ($this->{reply_queue}->pending) {
458	$entry = $this->{reply_queue}->dequeue;
459	$this->_call($dataclass->new->parse($entry));
460    }
461}
462
4631;
464