1#!/usr/bin/env perl
2
3package Net::HandlerSocket::HSPool;
4
5use strict;
6use warnings;
7use Net::HandlerSocket;
8use Socket;
9
10sub new {
11	my $self = {
12		config => $_[1],
13		reopen_interval => 60,
14		hostmap => { },
15	};
16	return bless $self, $_[0];
17}
18
19sub clear_pool {
20	my ($self) = @_;
21	$self->{hostmap} = { };
22}
23
24sub on_error {
25	my ($self, $obj) = @_;
26	my $error_func = $self->{config}->{error};
27	if (defined($error_func)) {
28		return &{$error_func}($obj);
29	}
30	die $obj;
31}
32
33sub on_warning {
34	my ($self, $obj) = @_;
35	my $warning_func = $self->{config}->{warning};
36	if (defined($warning_func)) {
37		return &{$warning_func}($obj);
38	}
39}
40
41sub get_conf {
42	my ($self, $dbtbl) = @_;
43	my $hcent = $self->{config}->{hostmap}->{$dbtbl};
44	if (!defined($hcent)) {
45		$self->on_error("get_conf: $dbtbl not found");
46		return undef;
47	}
48	my %cpy = %$hcent;
49	$cpy{port} ||= 9998;
50	$cpy{timeout} ||= 2;
51	return \%cpy;
52}
53
54sub resolve_hostname {
55	my ($self, $hcent, $host_ip_list) = @_;
56	if (defined($host_ip_list)) {
57		if (scalar(@$host_ip_list) > 0) {
58			$hcent->{host} = shift(@$host_ip_list);
59			return $host_ip_list;
60		}
61		return undef; # no more ip
62	}
63	my $host = $hcent->{host}; # unresolved name
64	$hcent->{hostname} = $host;
65	my $resolve_list_func = $self->{config}->{resolve_list};
66	if (defined($resolve_list_func)) {
67		$host_ip_list = &{$resolve_list_func}($host);
68		if (scalar(@$host_ip_list) > 0) {
69			$hcent->{host} = shift(@$host_ip_list);
70			return $host_ip_list;
71		}
72		return undef; # no more ip
73	}
74	my $resolve_func = $self->{config}->{resolve};
75	if (defined($resolve_func)) {
76		$hcent->{host} = &{$resolve_func}($host);
77		return [];
78	}
79	my $packed = gethostbyname($host);
80	if (!defined($packed)) {
81		return undef;
82	}
83	$hcent->{host} = inet_ntoa($packed);
84	return [];
85}
86
87sub get_handle_exec {
88	my ($self, $db, $tbl, $idx, $cols, $exec_multi, $exec_args) = @_;
89	my $now = time();
90	my $dbtbl = join('.', $db, $tbl);
91	my $hcent = $self->get_conf($dbtbl); # copy
92	if (!defined($hcent)) {
93		return undef;
94	}
95	my $hmkey = join(':', $hcent->{host}, $hcent->{port});
96	my $hment = $self->{hostmap}->{$hmkey};
97		# [ open_time, handle, index_map, host, next_index_id ]
98	my $host_ip_list;
99	TRY_OTHER_IP:
100	if (!defined($hment) ||
101		$hment->[0] + $self->{reopen_interval} < $now ||
102		!$hment->[1]->stable_point()) {
103		$host_ip_list = $self->resolve_hostname($hcent, $host_ip_list);
104		if (!defined($host_ip_list)) {
105			my $hostport = $hmkey . '(' . $hcent->{host} . ')';
106			$self->on_error("HSPool::get_handle" .
107				"($db, $tbl, $idx, $cols): host=$hmkey: " .
108				"no more active ip");
109			return undef;
110		}
111		my $hnd = new Net::HandlerSocket($hcent);
112		my %m = ();
113		$hment = [ $now, $hnd, \%m, $hcent->{host}, 1 ];
114		$self->{hostmap}->{$hmkey} = $hment;
115	}
116	my $hnd = $hment->[1];
117	my $idxmap = $hment->[2];
118	my $imkey = join(':', $idx, $cols);
119	my $idx_id = $idxmap->{$imkey};
120	if (!defined($idx_id)) {
121		$idx_id = $hment->[4];
122		my $e = $hnd->open_index($idx_id, $db, $tbl, $idx, $cols);
123		if ($e != 0) {
124			my $estr = $hnd->get_error();
125			my $hostport = $hmkey . '(' . $hcent->{host} . ')';
126			my $errmess = "HSPool::get_handle open_index" .
127				"($db, $tbl, $idx, $cols): host=$hostport " .
128				"err=$e($estr)";
129			$self->on_warning($errmess);
130			$hnd->close();
131			$hment = undef;
132			goto TRY_OTHER_IP;
133		}
134		$hment->[4]++;
135		$idxmap->{$imkey} = $idx_id;
136	}
137	if ($exec_multi) {
138		my $resarr;
139		for my $cmdent (@$exec_args) {
140			$cmdent->[0] = $idx_id;
141		}
142		if (scalar(@$exec_args) == 0) {
143			$resarr = [];
144		} else {
145			$resarr = $hnd->execute_multi($exec_args);
146		}
147		my $i = 0;
148		for my $res (@$resarr) {
149			if ($res->[0] != 0) {
150				my $cmdent = $exec_args->[$i];
151				my $ec = $res->[0];
152				my $estr = $res->[1];
153				my $op = $cmdent->[1];
154				my $kfvs = $cmdent->[2];
155				my $kvstr = defined($kfvs)
156					? join(',', @$kfvs) : '';
157				my $limit = $cmdent->[3] || 0;
158				my $skip = $cmdent->[4] || 0;
159				my $hostport = $hmkey . '(' . $hcent->{host}
160					. ')';
161				my $errmess = "HSPool::get_handle execm" .
162					"($db, $tbl, $idx, [$cols], " .
163					"($idx_id), $op, [$kvstr] " .
164					"$limit, $skip): " .
165					"host=$hostport err=$ec($estr)";
166				if ($res->[0] < 0 || $res->[0] == 2) {
167					$self->on_warning($errmess);
168					$hnd->close();
169					$hment = undef;
170					goto TRY_OTHER_IP;
171				} else {
172					$self->on_error($errmess);
173				}
174			}
175			shift(@$res);
176			++$i;
177		}
178		return $resarr;
179	} else {
180		my $res = $hnd->execute_find($idx_id, @$exec_args);
181		if ($res->[0] != 0) {
182			my ($op, $kfvals, $limit, $skip) = @$exec_args;
183			my $ec = $res->[0];
184			my $estr = $res->[1];
185			my $kvstr = join(',', @$kfvals);
186			my $hostport = $hmkey . '(' . $hcent->{host} . ')';
187			my $errmess = "HSPool::get_handle exec" .
188				"($db, $tbl, $idx, [$cols], ($idx_id), " .
189				"$op, [$kvstr], $limit, $skip): " .
190				"host=$hostport err=$ec($estr)";
191			if ($res->[0] < 0 || $res->[0] == 2) {
192				$self->on_warning($errmess);
193				$hnd->close();
194				$hment = undef;
195				goto TRY_OTHER_IP;
196			} else {
197				$self->on_error($errmess);
198			}
199		}
200		shift(@$res);
201		return $res;
202	}
203}
204
205sub index_find {
206	my ($self, $db, $tbl, $idx, $cols, $op, $kfvals, $limit, $skip) = @_;
207	# cols: comma separated list
208	# kfvals: arrayref
209	$limit ||= 0;
210	$skip ||= 0;
211	my $res = $self->get_handle_exec($db, $tbl, $idx, $cols,
212		0, [ $op, $kfvals, $limit, $skip ]);
213	return $res;
214}
215
216sub index_find_multi {
217	my ($self, $db, $tbl, $idx, $cols, $cmdlist) = @_;
218	# cols : comma separated list
219	# cmdlist : [ dummy, op, kfvals, limit, skip ]
220	# kfvals : arrayref
221	my $resarr = $self->get_handle_exec($db, $tbl, $idx, $cols,
222		1, $cmdlist);
223	return $resarr;
224}
225
226sub result_single_to_arrarr {
227	my ($numcols, $hsres, $ret) = @_;
228	my $hsreslen = scalar(@$hsres);
229	my $rlen = int($hsreslen / $numcols);
230	$ret = [ ] if !defined($ret);
231	my @r = ();
232	my $p = 0;
233	for (my $i = 0; $i < $rlen; ++$i) {
234		my @a = splice(@$hsres, $p, $numcols);
235		$p += $numcols;
236		push(@$ret, \@a);
237	}
238	return $ret; # arrayref of arrayrefs
239}
240
241sub result_multi_to_arrarr {
242	my ($numcols, $mhsres, $ret) = @_;
243	$ret = [ ] if !defined($ret);
244	for my $hsres (@$mhsres) {
245		my $hsreslen = scalar(@$hsres);
246		my $rlen = int($hsreslen / $numcols);
247		my $p = 0;
248		for (my $i = 0; $i < $rlen; ++$i) {
249			my @a = splice(@$hsres, $p, $numcols);
250			$p += $numcols;
251			push(@$ret, \@a);
252		}
253	}
254	return $ret; # arrayref of arrayrefs
255}
256
257sub result_single_to_hasharr {
258	my ($names, $hsres, $ret) = @_;
259	my $nameslen = scalar(@$names);
260	my $hsreslen = scalar(@$hsres);
261	my $rlen = int($hsreslen / $nameslen);
262	$ret = [ ] if !defined($ret);
263	my $p = 0;
264	for (my $i = 0; $i < $rlen; ++$i) {
265		my %h = ();
266		for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
267			$h{$names->[$j]} = $hsres->[$p];
268		}
269		push(@$ret, \%h);
270	}
271	return $ret; # arrayref of hashrefs
272}
273
274sub result_multi_to_hasharr {
275	my ($names, $mhsres, $ret) = @_;
276	my $nameslen = scalar(@$names);
277	$ret = [ ] if !defined($ret);
278	for my $hsres (@$mhsres) {
279		my $hsreslen = scalar(@$hsres);
280		my $rlen = int($hsreslen / $nameslen);
281		my $p = 0;
282		for (my $i = 0; $i < $rlen; ++$i) {
283			my %h = ();
284			for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
285				$h{$names->[$j]} = $hsres->[$p];
286			}
287			push(@$ret, \%h);
288		}
289	}
290	return $ret; # arrayref of hashrefs
291}
292
293sub result_single_to_hashhash {
294	my ($names, $key, $hsres, $ret) = @_;
295	my $nameslen = scalar(@$names);
296	my $hsreslen = scalar(@$hsres);
297	my $rlen = int($hsreslen / $nameslen);
298	$ret = { } if !defined($ret);
299	my $p = 0;
300	for (my $i = 0; $i < $rlen; ++$i) {
301		my %h = ();
302		for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
303			$h{$names->[$j]} = $hsres->[$p];
304		}
305		my $k = $h{$key};
306		$ret->{$k} = \%h if defined($k);
307	}
308	return $ret; # hashref of hashrefs
309}
310
311sub result_multi_to_hashhash {
312	my ($names, $key, $mhsres, $ret) = @_;
313	my $nameslen = scalar(@$names);
314	$ret = { } if !defined($ret);
315	for my $hsres (@$mhsres) {
316		my $hsreslen = scalar(@$hsres);
317		my $rlen = int($hsreslen / $nameslen);
318		my $p = 0;
319		for (my $i = 0; $i < $rlen; ++$i) {
320			my %h = ();
321			for (my $j = 0; $j < $nameslen; ++$j, ++$p) {
322				$h{$names->[$j]} = $hsres->[$p];
323			}
324			my $k = $h{$key};
325			$ret->{$k} = \%h if defined($k);
326		}
327	}
328	return $ret; # hashref of hashrefs
329}
330
331sub select_cols_where_eq_aa {
332	# SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
333	my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref) = @_;
334	my $cols_str = join(',', @$cols_aref);
335	my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
336	return result_single_to_arrarr(scalar(@$cols_aref), $res);
337}
338
339sub select_cols_where_eq_hh {
340	# SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1
341	my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref, $retkey) = @_;
342	my $cols_str = join(',', @$cols_aref);
343	my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref);
344	my $r = result_single_to_hashhash($cols_aref, $retkey, $res);
345	return $r;
346}
347
348sub select_cols_where_in_hh {
349	# SELECT $cols FROM $db.$tbl WHERE $idx_key in ($vals)
350	my ($self, $db, $tbl, $idx, $cols_aref, $vals_aref, $retkey) = @_;
351	my $cols_str = join(',', @$cols_aref);
352	my @cmdlist = ();
353	for my $v (@$vals_aref) {
354		push(@cmdlist, [ -1, '=', [ $v ] ]);
355	}
356	my $res = $self->index_find_multi($db, $tbl, $idx, $cols_str,
357		\@cmdlist);
358	return result_multi_to_hashhash($cols_aref, $retkey, $res);
359}
360
3611;
362
363