1#!/usr/bin/env perl 2 3package Net::HandlerSocket::HSPool; 4 5use strict; 6use warnings; 7use Net::HandlerSocket; 8use Socket; 9 10sub new { 11 my $self = { 12 config => $_[1], 13 reopen_interval => 60, 14 hostmap => { }, 15 }; 16 return bless $self, $_[0]; 17} 18 19sub clear_pool { 20 my ($self) = @_; 21 $self->{hostmap} = { }; 22} 23 24sub on_error { 25 my ($self, $obj) = @_; 26 my $error_func = $self->{config}->{error}; 27 if (defined($error_func)) { 28 return &{$error_func}($obj); 29 } 30 die $obj; 31} 32 33sub on_warning { 34 my ($self, $obj) = @_; 35 my $warning_func = $self->{config}->{warning}; 36 if (defined($warning_func)) { 37 return &{$warning_func}($obj); 38 } 39} 40 41sub get_conf { 42 my ($self, $dbtbl) = @_; 43 my $hcent = $self->{config}->{hostmap}->{$dbtbl}; 44 if (!defined($hcent)) { 45 $self->on_error("get_conf: $dbtbl not found"); 46 return undef; 47 } 48 my %cpy = %$hcent; 49 $cpy{port} ||= 9998; 50 $cpy{timeout} ||= 2; 51 return \%cpy; 52} 53 54sub resolve_hostname { 55 my ($self, $hcent, $host_ip_list) = @_; 56 if (defined($host_ip_list)) { 57 if (scalar(@$host_ip_list) > 0) { 58 $hcent->{host} = shift(@$host_ip_list); 59 return $host_ip_list; 60 } 61 return undef; # no more ip 62 } 63 my $host = $hcent->{host}; # unresolved name 64 $hcent->{hostname} = $host; 65 my $resolve_list_func = $self->{config}->{resolve_list}; 66 if (defined($resolve_list_func)) { 67 $host_ip_list = &{$resolve_list_func}($host); 68 if (scalar(@$host_ip_list) > 0) { 69 $hcent->{host} = shift(@$host_ip_list); 70 return $host_ip_list; 71 } 72 return undef; # no more ip 73 } 74 my $resolve_func = $self->{config}->{resolve}; 75 if (defined($resolve_func)) { 76 $hcent->{host} = &{$resolve_func}($host); 77 return []; 78 } 79 my $packed = gethostbyname($host); 80 if (!defined($packed)) { 81 return undef; 82 } 83 $hcent->{host} = inet_ntoa($packed); 84 return []; 85} 86 87sub get_handle_exec { 88 my ($self, $db, $tbl, $idx, $cols, $exec_multi, $exec_args) = @_; 89 my $now = time(); 90 my $dbtbl = join('.', $db, $tbl); 91 my $hcent = $self->get_conf($dbtbl); # copy 92 if (!defined($hcent)) { 93 return undef; 94 } 95 my $hmkey = join(':', $hcent->{host}, $hcent->{port}); 96 my $hment = $self->{hostmap}->{$hmkey}; 97 # [ open_time, handle, index_map, host, next_index_id ] 98 my $host_ip_list; 99 TRY_OTHER_IP: 100 if (!defined($hment) || 101 $hment->[0] + $self->{reopen_interval} < $now || 102 !$hment->[1]->stable_point()) { 103 $host_ip_list = $self->resolve_hostname($hcent, $host_ip_list); 104 if (!defined($host_ip_list)) { 105 my $hostport = $hmkey . '(' . $hcent->{host} . ')'; 106 $self->on_error("HSPool::get_handle" . 107 "($db, $tbl, $idx, $cols): host=$hmkey: " . 108 "no more active ip"); 109 return undef; 110 } 111 my $hnd = new Net::HandlerSocket($hcent); 112 my %m = (); 113 $hment = [ $now, $hnd, \%m, $hcent->{host}, 1 ]; 114 $self->{hostmap}->{$hmkey} = $hment; 115 } 116 my $hnd = $hment->[1]; 117 my $idxmap = $hment->[2]; 118 my $imkey = join(':', $idx, $cols); 119 my $idx_id = $idxmap->{$imkey}; 120 if (!defined($idx_id)) { 121 $idx_id = $hment->[4]; 122 my $e = $hnd->open_index($idx_id, $db, $tbl, $idx, $cols); 123 if ($e != 0) { 124 my $estr = $hnd->get_error(); 125 my $hostport = $hmkey . '(' . $hcent->{host} . ')'; 126 my $errmess = "HSPool::get_handle open_index" . 127 "($db, $tbl, $idx, $cols): host=$hostport " . 128 "err=$e($estr)"; 129 $self->on_warning($errmess); 130 $hnd->close(); 131 $hment = undef; 132 goto TRY_OTHER_IP; 133 } 134 $hment->[4]++; 135 $idxmap->{$imkey} = $idx_id; 136 } 137 if ($exec_multi) { 138 my $resarr; 139 for my $cmdent (@$exec_args) { 140 $cmdent->[0] = $idx_id; 141 } 142 if (scalar(@$exec_args) == 0) { 143 $resarr = []; 144 } else { 145 $resarr = $hnd->execute_multi($exec_args); 146 } 147 my $i = 0; 148 for my $res (@$resarr) { 149 if ($res->[0] != 0) { 150 my $cmdent = $exec_args->[$i]; 151 my $ec = $res->[0]; 152 my $estr = $res->[1]; 153 my $op = $cmdent->[1]; 154 my $kfvs = $cmdent->[2]; 155 my $kvstr = defined($kfvs) 156 ? join(',', @$kfvs) : ''; 157 my $limit = $cmdent->[3] || 0; 158 my $skip = $cmdent->[4] || 0; 159 my $hostport = $hmkey . '(' . $hcent->{host} 160 . ')'; 161 my $errmess = "HSPool::get_handle execm" . 162 "($db, $tbl, $idx, [$cols], " . 163 "($idx_id), $op, [$kvstr] " . 164 "$limit, $skip): " . 165 "host=$hostport err=$ec($estr)"; 166 if ($res->[0] < 0 || $res->[0] == 2) { 167 $self->on_warning($errmess); 168 $hnd->close(); 169 $hment = undef; 170 goto TRY_OTHER_IP; 171 } else { 172 $self->on_error($errmess); 173 } 174 } 175 shift(@$res); 176 ++$i; 177 } 178 return $resarr; 179 } else { 180 my $res = $hnd->execute_find($idx_id, @$exec_args); 181 if ($res->[0] != 0) { 182 my ($op, $kfvals, $limit, $skip) = @$exec_args; 183 my $ec = $res->[0]; 184 my $estr = $res->[1]; 185 my $kvstr = join(',', @$kfvals); 186 my $hostport = $hmkey . '(' . $hcent->{host} . ')'; 187 my $errmess = "HSPool::get_handle exec" . 188 "($db, $tbl, $idx, [$cols], ($idx_id), " . 189 "$op, [$kvstr], $limit, $skip): " . 190 "host=$hostport err=$ec($estr)"; 191 if ($res->[0] < 0 || $res->[0] == 2) { 192 $self->on_warning($errmess); 193 $hnd->close(); 194 $hment = undef; 195 goto TRY_OTHER_IP; 196 } else { 197 $self->on_error($errmess); 198 } 199 } 200 shift(@$res); 201 return $res; 202 } 203} 204 205sub index_find { 206 my ($self, $db, $tbl, $idx, $cols, $op, $kfvals, $limit, $skip) = @_; 207 # cols: comma separated list 208 # kfvals: arrayref 209 $limit ||= 0; 210 $skip ||= 0; 211 my $res = $self->get_handle_exec($db, $tbl, $idx, $cols, 212 0, [ $op, $kfvals, $limit, $skip ]); 213 return $res; 214} 215 216sub index_find_multi { 217 my ($self, $db, $tbl, $idx, $cols, $cmdlist) = @_; 218 # cols : comma separated list 219 # cmdlist : [ dummy, op, kfvals, limit, skip ] 220 # kfvals : arrayref 221 my $resarr = $self->get_handle_exec($db, $tbl, $idx, $cols, 222 1, $cmdlist); 223 return $resarr; 224} 225 226sub result_single_to_arrarr { 227 my ($numcols, $hsres, $ret) = @_; 228 my $hsreslen = scalar(@$hsres); 229 my $rlen = int($hsreslen / $numcols); 230 $ret = [ ] if !defined($ret); 231 my @r = (); 232 my $p = 0; 233 for (my $i = 0; $i < $rlen; ++$i) { 234 my @a = splice(@$hsres, $p, $numcols); 235 $p += $numcols; 236 push(@$ret, \@a); 237 } 238 return $ret; # arrayref of arrayrefs 239} 240 241sub result_multi_to_arrarr { 242 my ($numcols, $mhsres, $ret) = @_; 243 $ret = [ ] if !defined($ret); 244 for my $hsres (@$mhsres) { 245 my $hsreslen = scalar(@$hsres); 246 my $rlen = int($hsreslen / $numcols); 247 my $p = 0; 248 for (my $i = 0; $i < $rlen; ++$i) { 249 my @a = splice(@$hsres, $p, $numcols); 250 $p += $numcols; 251 push(@$ret, \@a); 252 } 253 } 254 return $ret; # arrayref of arrayrefs 255} 256 257sub result_single_to_hasharr { 258 my ($names, $hsres, $ret) = @_; 259 my $nameslen = scalar(@$names); 260 my $hsreslen = scalar(@$hsres); 261 my $rlen = int($hsreslen / $nameslen); 262 $ret = [ ] if !defined($ret); 263 my $p = 0; 264 for (my $i = 0; $i < $rlen; ++$i) { 265 my %h = (); 266 for (my $j = 0; $j < $nameslen; ++$j, ++$p) { 267 $h{$names->[$j]} = $hsres->[$p]; 268 } 269 push(@$ret, \%h); 270 } 271 return $ret; # arrayref of hashrefs 272} 273 274sub result_multi_to_hasharr { 275 my ($names, $mhsres, $ret) = @_; 276 my $nameslen = scalar(@$names); 277 $ret = [ ] if !defined($ret); 278 for my $hsres (@$mhsres) { 279 my $hsreslen = scalar(@$hsres); 280 my $rlen = int($hsreslen / $nameslen); 281 my $p = 0; 282 for (my $i = 0; $i < $rlen; ++$i) { 283 my %h = (); 284 for (my $j = 0; $j < $nameslen; ++$j, ++$p) { 285 $h{$names->[$j]} = $hsres->[$p]; 286 } 287 push(@$ret, \%h); 288 } 289 } 290 return $ret; # arrayref of hashrefs 291} 292 293sub result_single_to_hashhash { 294 my ($names, $key, $hsres, $ret) = @_; 295 my $nameslen = scalar(@$names); 296 my $hsreslen = scalar(@$hsres); 297 my $rlen = int($hsreslen / $nameslen); 298 $ret = { } if !defined($ret); 299 my $p = 0; 300 for (my $i = 0; $i < $rlen; ++$i) { 301 my %h = (); 302 for (my $j = 0; $j < $nameslen; ++$j, ++$p) { 303 $h{$names->[$j]} = $hsres->[$p]; 304 } 305 my $k = $h{$key}; 306 $ret->{$k} = \%h if defined($k); 307 } 308 return $ret; # hashref of hashrefs 309} 310 311sub result_multi_to_hashhash { 312 my ($names, $key, $mhsres, $ret) = @_; 313 my $nameslen = scalar(@$names); 314 $ret = { } if !defined($ret); 315 for my $hsres (@$mhsres) { 316 my $hsreslen = scalar(@$hsres); 317 my $rlen = int($hsreslen / $nameslen); 318 my $p = 0; 319 for (my $i = 0; $i < $rlen; ++$i) { 320 my %h = (); 321 for (my $j = 0; $j < $nameslen; ++$j, ++$p) { 322 $h{$names->[$j]} = $hsres->[$p]; 323 } 324 my $k = $h{$key}; 325 $ret->{$k} = \%h if defined($k); 326 } 327 } 328 return $ret; # hashref of hashrefs 329} 330 331sub select_cols_where_eq_aa { 332 # SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1 333 my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref) = @_; 334 my $cols_str = join(',', @$cols_aref); 335 my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref); 336 return result_single_to_arrarr(scalar(@$cols_aref), $res); 337} 338 339sub select_cols_where_eq_hh { 340 # SELECT $cols FROM $db.$tbl WHERE $idx_key = $kv LIMIT 1 341 my ($self, $db, $tbl, $idx, $cols_aref, $kv_aref, $retkey) = @_; 342 my $cols_str = join(',', @$cols_aref); 343 my $res = $self->index_find($db, $tbl, $idx, $cols_str, '=', $kv_aref); 344 my $r = result_single_to_hashhash($cols_aref, $retkey, $res); 345 return $r; 346} 347 348sub select_cols_where_in_hh { 349 # SELECT $cols FROM $db.$tbl WHERE $idx_key in ($vals) 350 my ($self, $db, $tbl, $idx, $cols_aref, $vals_aref, $retkey) = @_; 351 my $cols_str = join(',', @$cols_aref); 352 my @cmdlist = (); 353 for my $v (@$vals_aref) { 354 push(@cmdlist, [ -1, '=', [ $v ] ]); 355 } 356 my $res = $self->index_find_multi($db, $tbl, $idx, $cols_str, 357 \@cmdlist); 358 return result_multi_to_hashhash($cols_aref, $retkey, $res); 359} 360 3611; 362 363