1# -*- cperl -*- 2# Copyright (c) 2013, 2021, Oracle and/or its affiliates. 3# All rights reserved. 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License, version 2.0, 7# as published by the Free Software Foundation. 8# 9# This program is also distributed with certain software (including 10# but not limited to OpenSSL) that is licensed under separate terms, 11# as designated in a particular file or component or in included license 12# documentation. The authors of MySQL hereby grant you an additional 13# permission to link the program and your derivative works with the 14# separately licensed software that they have included with MySQL. 15# 16# This program is distributed in the hope that it will be useful, 17# but WITHOUT ANY WARRANTY; without even the implied warranty of 18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 19# GNU General Public License, version 2.0, for more details. 20# 21# You should have received a copy of the GNU General Public License 22# along with this program; if not, write to the Free Software 23# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 24 25 26########## Memcache Client Library for Perl 27### 28### $mc = My::Memcache->new() create an ascii-protocol client 29### $mc = My::Memcache::Binary->new() create a binary-protocol client 30### 31### $mc->connect(host, port) returns 1 on success, 0 on failure 32### 33### $mc->{error} holds most recent error/status message 34### 35### $mc->store(cmd, key, value, ...) alternate API for set/add/replace/append/prepend 36### $mc->set(key, value) returns 1 on success, 0 on failure 37### $mc->add(key, value) set if record does not exist 38### $mc->replace(key, value) set if record exists 39### $mc->append(key, value) append value to existing data 40### $mc->prepend(key, value) prepend value to existing data 41### 42### $mc->get(key, [ key ...]) returns a value or undef 43### $mc->next_result() Fetch results after get() 44### 45### $mc->delete(key) returns 1 on success, 0 on failure 46### $mc->stats(stat_key) get stats; returns a hash 47### $mc->incr(key, amount, [initial]) returns the new value or undef 48### $mc->decr(key, amount, [initial]) like incr. 49### The third argument is used in 50### the binary protocol ONLY. 51### $mc->flush() flush_all 52### 53### $mc->set_expires(sec) Set TTL for all store operations 54### $mc->set_flags(int_flags) Set numeric flags for store operations 55### 56### $mc->note_config_version() 57### Store the generation number of the running config in the filesystem, 58### for later use by wait_for_reconf() 59### 60### $mc->wait_for_reconf() 61### Wait for NDB/Memcache to complete online reconfiguration. 62### Returns the generation number of the newly running configuration, 63### or zero on timeout/error. 64 65################ TODO ################################ 66### * Support explicit binary k/q commands with pipelining 67### * Implement TOUCH & GAT commands 68### * Support UDP 69### * Standardize APIs to take (key, value, hashref-of-options) 70 71use strict; 72use IO::Socket::INET; 73use IO::File; 74use Carp; 75use Time::HiRes; 76use Errno qw( EWOULDBLOCK ); 77 78######## Memcache Result 79 80package My::Memcache::Result; 81 82sub new { 83 my ($pkg, $key, $flags, $cas) = @_; 84 $cas = 0 if(! defined($cas)); 85 bless { 86 "key" => $key, 87 "flags" => $flags, 88 "cas" => $cas, 89 "value" => undef, 90 }, $pkg; 91} 92 93 94######## Memcache Client 95 96package My::Memcache; 97 98sub new { 99 my $pkg = shift; 100 # min/max wait refer to msec. wait during temporary errors. Both powers of 2. 101 # io_timeout is in seconds (possibly fractional) 102 # io_timeout * max_read_tries = read timeout 103 bless { "created" => 1 , "error" => "OK" , "cf_gen" => 0, "req_id" => 0, 104 "min_wait" => 4, "max_wait" => 8192, "temp_errors" => 0 , 105 "total_wait" => 0, "has_cas" => 0, "flags" => 0, "exptime" => 0, 106 "get_results" => undef, "get_with_cas" => 0, "failed" => 0, 107 "io_timeout" => 5.0, "sysread_size" => 512, "max_read_tries" => 6, 108 "readbuf" => "", "buflen" => 0, "error_detail" => "", "read_try" => 0, 109 "max_write_tries" => 6 110 }, $pkg; 111} 112 113 114sub summarize { 115 my $val = shift; 116 my $len = length $val; 117 118 if($len > 25) { 119 return substr($val,0,10) . "..." . substr($val,-10) . " [len $len]"; 120 } else { 121 return $val; 122 } 123} 124 125 126# fail() is called when an MTR test fails 127sub fail { 128 my $self = shift; 129 my $fd; 130 131 if($self->{failed}) { 132 print STDERR " /// My::Memcache::fail() called recursively.\n"; 133 return; 134 } 135 $self->{failed} = 1; 136 137 my $msg = 138 "error: " . $self->{error} ."\t". 139 "read_try: " . $self->{read_try} ."\t". 140 "protocol: " . $self->protocol() ."\n". 141 "req_id: " . $self->{req_id} ."\t". 142 "temp err wait: " . $self->{total_wait} ." msec.\n"; 143 144 $msg .= "detail: " . $self->{error_detail} . "\n"; 145 $msg .= "buffer: " . summarize($self->{readbuf}) . "\n"; 146 147 my $r = $self->next_result(); 148 $msg .= "value: " . summarize($r->{value}) . "\n" if($r); 149 150 while(my $extra = shift) { 151 $msg .= $extra; 152 } 153 $msg .= "\n"; 154 155 $msg .= $self->get_server_error_stats(); 156 157 # Load Average on linux 158 $msg .= ("Load Avg: " . <$fd>) if(open($fd, "/proc/loadavg")); 159 160 $msg .= "====~~~~____~~~~====\n"; 161 162 Carp::confess($msg); 163} 164 165 166# Attempt a new connection to memcached to flush the server's error log 167# and obtain error statistics 168 169sub get_server_error_stats { 170 my $self = shift; 171 my $new_client = My::Memcache::Binary->new(); 172 my $r = $new_client->connect($self->{host}, $self->{port}); 173 my $msg = ""; 174 175 if($r) { 176 my %stats = $new_client->stats("errors"); # also flushes server log 177 $msg .= "Server error stats:\n"; 178 $msg .= sprintf("%s : %s\n", $_, $stats{$_}) for keys(%stats); 179 } else { 180 $msg = "Attempted new server connection to fetch error statistics but failed.\n"; 181 } 182 return $msg; 183} 184 185 186# Common code to ASCII and BINARY protocols: 187 188sub connect { 189 my $self = shift; 190 my $host = shift; 191 my $port = shift; 192 my $conn; 193 194 # Wait for memcached to be ready, up to ten seconds. 195 my $retries = 100; 196 do { 197 $conn = IO::Socket::INET->new(PeerAddr => "$host:$port", Proto => "tcp"); 198 if(! $conn) { 199 Time::HiRes::usleep(100 * 1000); 200 $retries--; 201 } 202 } while($retries && !$conn); 203 204 if($conn) { 205 $conn->blocking(0); # Set non-blocking 206 my $fd = fileno $conn; 207 my $fdset = ''; 208 vec($fdset, $fd, 1) = 1; 209 $self->{fdset} = $fdset; 210 $self->{connection} = $conn; 211 $self->{host} = $host; 212 $self->{port} = $port; 213 return 1; 214 } 215 $self->{error} = "CONNECTION_FAILED"; 216 return 0; 217} 218 219sub DESTROY { 220 my $self = shift; 221 if($self->{connection}) { 222 $self->{connection}->close(); 223 } 224} 225 226sub set_expires { 227 my $self = shift; 228 $self->{exptime} = shift; 229} 230 231sub set_flags { 232 my $self = shift; 233 $self->{flags} = shift; 234} 235 236# Some member variables are per-request. 237# Clear them in preparation for a new request, and increment the request counter. 238sub new_request { 239 my $self = shift; 240 $self->{error} = "OK"; 241 $self->{read_try} = 0; 242 $self->{has_cas} = 0; 243 $self->{req_id}++; 244 $self->{get_results} = undef; 245} 246 247sub next_result { 248 my $self = shift; 249 shift @{$self->{get_results}}; 250} 251 252# note_config_version and wait_for_reconf are only for use by mysql-test-run 253sub note_config_version { 254 my $self = shift; 255 256 my $vardir = $ENV{MYSQLTEST_VARDIR}; 257 # Fetch the memcached current config generation number and save it 258 my %stats = $self->stats("reconf"); 259 my $F = IO::File->new("$vardir/tmp/memcache_cf_gen", "w") or die; 260 my $ver = $stats{"Running"}; 261 print $F "$ver\n"; 262 $F->close(); 263 264 $self->{cf_gen} = $ver; 265} 266 267sub wait_for_reconf { 268 my $self = shift; 269 270 if($self->{cf_gen} == 0) { 271 my $cfgen = 0; 272 my $vardir = $ENV{MYSQLTEST_VARDIR}; 273 my $F = IO::File->new("$vardir/tmp/memcache_cf_gen", "r"); 274 if(defined $F) { 275 chomp($cfgen = <$F>); 276 undef $F; 277 } 278 $self->{cf_gen} = $cfgen; 279 } 280 281 my $wait_for = $self->{cf_gen} + 1 ; 282 283 my $new_gen = $self->wait_for_config_generation($wait_for); 284 if($new_gen > 0) { 285 $self->{cf_gen} = $new_gen; 286 } 287 else { 288 print STDERR "Wait for config generation $wait_for timed out.\n"; 289 } 290 291 return $new_gen; 292} 293 294# wait_for_config_generation($cf_gen) 295# Wait until memcached is running config generation >= to $cf_gen 296# Returns 0 on error/timeout, or the actual running generation number 297# 298sub wait_for_config_generation { 299 my $self = shift; 300 my $cf_gen = shift; 301 my $ready = 0; 302 my $retries = 100; # 100 retries x 100 ms = 10s 303 304 while($retries && ! $ready) { 305 Time::HiRes::usleep(100 * 1000); 306 my %stats = $self->stats("reconf"); 307 if($stats{"Running"} >= $cf_gen) { 308 $ready = $stats{"Running"}; 309 } 310 else { 311 $retries -= 1; 312 } 313 } 314 return $ready; 315} 316 317# ----------------------------------------------------------------------- 318# -------------- Low-level Network Handling --------------- 319# ----------------------------------------------------------------------- 320 321# Utility function sets error based on network error & returns false. 322sub socket_error { 323 my $self = shift; 324 my $retval = shift; 325 my $detail = shift; 326 327 if($retval == 1) { 328 $self->{error} = "CONNECTION_CLOSED"; 329 } elsif($retval == 0) { 330 $self->{error} = "NETWORK_TIMEOUT"; 331 } else { 332 $self->{error} = "NETWORK_ERROR: " . $!; 333 } 334 $self->{error_detail} = $detail if($detail); 335 336 return 0; 337} 338 339# $mc->write(packet). Returns true on success, false on error. 340sub write { 341 my $self = shift; 342 my $packet = shift; 343 my $len = length($packet); 344 my $nsent = 0; 345 my $attempt = 0; 346 my $r; 347 348 if(! $self->{connection}->connected()) { 349 return $self->socket_error(0, "write(): not connected"); 350 } 351 352 while($nsent < $len) { 353 $r = select(undef, $self->{fdset}, undef, $self->{io_timeout}); 354 if($r < 1) { 355 if(++$attempt >= $self->{max_write_tries}) { 356 return $self->socket_error($r, "write(): select() returned $r"); 357 } 358 } else { 359 $r = $self->{connection}->send(substr($packet, $nsent)); 360 if($r > 0) { 361 $nsent += $r; 362 } elsif($! != Errno::EWOULDBLOCK) { 363 return $self->socket_error($r, "write(): send() errno $!"); 364 } 365 } 366 } 367 return 1; 368} 369 370# $mc->read(desired_size). Low-level read. Returns true on success, 371# appends data to readbuf, and sets buflen. Returns false on error. 372sub read { 373 my $self = shift; 374 my $length = shift; 375 my $sock = $self->{connection}; 376 my $r; 377 378 if($length > 0) { 379 $r = select($self->{fdset}, undef, undef, $self->{io_timeout}); 380 return $self->socket_error($r, "read(): select() $!") if($r < 0); 381 382 $r = $sock->sysread($self->{readbuf}, $length, $self->{buflen}); 383 if($r > 0) { 384 $self->{buflen} += $r; 385 } elsif($r < 0 && $! != Errno::EWOULDBLOCK) { 386 return $self->socket_error( $r == 0 ? 1 : $r, "read(): sysread() $!"); 387 } 388 } 389 return 1; 390} 391 392# Utility routine; assumes $len is available on buffer. 393sub chop_from_buffer { 394 my $self = shift; 395 my $len = shift; 396 397 my $line = substr($self->{readbuf}, 0, $len); 398 $self->{readbuf} = substr($self->{readbuf}, $len); 399 $self->{buflen} -= $len; 400 return $line; 401} 402 403# Returns a line if available; otherwise undef 404sub get_line_from_buffer { 405 my $self = shift; 406 my $line = undef; 407 408 my $idx = index($self->{readbuf}, "\r\n"); 409 if($idx >= 0) { 410 $line = $self->chop_from_buffer($idx + 2); # 2 for \r\n 411 } 412 return $line; 413} 414 415# Returns length if available; otherwise undef 416sub get_length_from_buffer { 417 my $self = shift; 418 my $len = shift; 419 420 if($self->{buflen} >= $len) { 421 return $self->chop_from_buffer($len); 422 } 423 return undef; 424} 425 426# Read up to newline. Returns a line, or sets and returns error. 427sub read_line { 428 my $self = shift; 429 my $message; 430 431 $self->{read_try} = 0; 432 while((! defined($message)) && $self->{read_try} < $self->{max_read_tries}) { 433 $self->{read_try}++; 434 $message = $self->get_line_from_buffer(); 435 if(! defined($message)) { 436 if(! $self->read($self->{sysread_size})) { 437 return $self->{error}; 438 } 439 } 440 } 441 if(defined($message)) { 442 $self->normalize_error($message); # handle server error responses 443 return $message; 444 } 445 446 $self->socket_error(0, "read_line(): timeout"); 447 return $self->{error}; 448} 449 450# Read <length> bytes. Returns the data, or returns undef and sets error. 451sub read_known_length { 452 my $self = shift; 453 my $len = shift; 454 my $data; 455 456 $self->{read_try} = 0; 457 while($self->{read_try} < $self->{max_read_tries}) { 458 $self->{read_try}++; 459 $data = $self->get_length_from_buffer($len); 460 return $data if(defined($data)); 461 if(! $self->read($len - $self->{buflen})) { 462 return undef; 463 } 464 } 465 # Perhaps the read completed on the final attempt 466 $data = $self->get_length_from_buffer($len); 467 if(! defined($data)) { 468 $self->socket_error(0, "read_known_length(): timeout"); 469 } 470 return $data; 471} 472 473 474# ----------------------------------------------------------------------- 475# ------------------ ASCII PROTOCOL -------------------- 476# ----------------------------------------------------------------------- 477 478sub protocol { 479 return "ascii"; 480} 481 482sub protocol_error { 483 my $self = shift; 484 my $detail = shift; 485 486 if($self->{error} eq "OK") { 487 $self->{error} = "PROTOCOL_ERROR"; 488 } 489 if($detail) { 490 $self->{error_detail} = $detail; 491 } 492 return undef; 493} 494 495sub ascii_command { 496 my $self = shift; 497 my $packet = shift; 498 my $waitTime = $self->{min_wait}; 499 my $maxWait = $self->{max_wait}; 500 my $reply; 501 502 do { 503 $self->new_request(); 504 $self->write($packet); 505 $reply = $self->read_line(); 506 if($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime < $maxWait) { 507 $self->{temp_errors} += 1; 508 $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000); 509 $waitTime *= 2; 510 } 511 } while($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime <= $maxWait); 512 513 return $reply; 514} 515 516 517sub delete { 518 my $self = shift; 519 my $key = shift; 520 my $response = $self->ascii_command("delete $key\r\n"); 521 return 1 if($response =~ "^DELETED"); 522 return 0 if($response =~ "^NOT_FOUND"); 523 return 0 if($response =~ "^SERVER_ERROR"); 524 return $self->protocol_error("delete() got response: $response"); 525} 526 527 528sub store { 529 my ($self, $cmd, $key, $value, $flags, $exptime, $cas_chk) = @_; 530 $flags = $self->{flags} unless $flags; 531 $exptime = $self->{exptime} unless $exptime; 532 my $packet; 533 if(($cmd eq "cas" || $cmd eq "replace") && $cas_chk > 0) 534 { 535 $packet = sprintf("cas %s %d %d %d %d\r\n%s\r\n", 536 $key, $flags, $exptime, $cas_chk, length($value), $value); 537 } 538 else 539 { 540 $packet = sprintf("%s %s %d %d %d\r\n%s\r\n", 541 $cmd, $key, $flags, $exptime, length($value), $value); 542 } 543 my $response = $self->ascii_command($packet); 544 return 1 if($response =~ "^STORED"); 545 return 0 if($response =~ "^NOT_STORED"); 546 return 0 if($response =~ "^EXISTS"); 547 return 0 if($response =~ "^NOT_FOUND"); 548 return 0 if($response =~ "^SERVER_ERROR"); 549 return $self->protocol_error("store() got response: $response"); 550} 551 552sub set { 553 my ($self, $key, $value, $flags, $exptime) = @_; 554 return $self->store("set", $key, $value, $flags, $exptime); 555} 556 557 558sub add { 559 my ($self, $key, $value, $flags, $exptime) = @_; 560 return $self->store("add", $key, $value, $flags, $exptime); 561} 562 563 564sub append { 565 my ($self, $key, $value, $flags, $exptime) = @_; 566 return $self->store("append", $key, $value, $flags, $exptime); 567} 568 569 570sub prepend { 571 my ($self, $key, $value, $flags, $exptime) = @_; 572 return $self->store("prepend", $key, $value, $flags, $exptime); 573} 574 575 576sub replace { 577 my ($self, $key, $value, $flags, $exptime, $cas) = @_; 578 return $self->store("replace", $key, $value, $flags, $exptime, $cas); 579} 580 581 582sub get { 583 my $self = shift; 584 my @results; 585 my $keys = ""; 586 $keys .= shift(@_) . " " while(@_); 587 my $command = $self->{get_with_cas} ? "gets" : "get"; 588 $self->{get_with_cas} = 0; # CHECK, THEN RESET FOR NEXT CALL 589 my $response = $self->ascii_command("$command $keys\r\n"); 590 return undef if($self->{error} ne "OK"); 591 while ($response ne "END\r\n") 592 { 593 $response =~ /^VALUE (\S+) (\d+) (\d+) ?(\d+)?/; 594 if(! (defined($1) && defined($2) && defined($3))) { 595 return $self->protocol_error("GET response: $response"); 596 } 597 my $result = My::Memcache::Result->new($1, $2, $4); 598 my $value = $self->read_known_length($3); 599 return undef if(!defined($value)); 600 $result->{value} = $value; 601 $self->read_line(); # Get trailing \r\n after value 602 $self->{has_cas} = 1 if($4); 603 push @results, $result; 604 $response = $self->read_line(); 605 } 606 $self->{get_results} = \@results; 607 return $results[0]->{value} if @results; 608 $self->{error} = "NOT_FOUND"; 609 return undef; 610} 611 612 613sub _txt_math { 614 my ($self, $cmd, $key, $delta) = @_; 615 my $response = $self->ascii_command("$cmd $key $delta \r\n"); 616 617 if ($response =~ "^NOT_FOUND" || $response =~ "ERROR") { 618 return undef; 619 } 620 621 $response =~ /(\d+)/; 622 return $self->protocol_error("MATH response: $response") unless defined($1); 623 return $1; 624} 625 626 627sub incr { 628 my ($self, $key, $delta) = @_; 629 return $self->_txt_math("incr", $key, $delta); 630} 631 632 633sub decr { 634 my ($self, $key, $delta) = @_; 635 return $self->_txt_math("decr", $key, $delta); 636} 637 638 639sub stats { 640 my $self = shift; 641 my $key = shift || ""; 642 643 $self->new_request(); 644 $self->write("stats $key\r\n"); 645 646 my %response = (); 647 my $line = $self->read_line(); 648 while($line !~ "^END") { 649 return %response if $line eq "ERROR\r\n"; 650 if(($line) && ($line =~ /^STAT(\s+)(\S+)(\s+)(\S+)/)) { 651 $response{$2} = $4; 652 } else { 653 return $self->protocol_error("STATS response line: $line"); 654 } 655 $line = $self->read_line(); 656 } 657 658 return %response; 659} 660 661 662sub flush { 663 my $self = shift; 664 my $key = shift; 665 my $result = $self->ascii_command("flush_all\r\n"); 666 return ($self->{error} eq "OK"); 667} 668 669 670# Try to provide consistent error messagees across ascii & binary protocols 671sub normalize_error { 672 my $self = shift; 673 my $reply = shift; 674 my %error_message = ( 675 "STORED\r\n" => "OK", 676 "EXISTS\r\n" => "KEY_EXISTS", 677 "NOT_FOUND\r\n" => "NOT_FOUND", 678 "NOT_STORED\r\n" => "NOT_STORED", 679 "CLIENT_ERROR value too big\r\n" => "VALUE_TOO_LARGE", 680 "SERVER_ERROR object too large for cache\r\n" => "VALUE_TOO_LARGE", 681 "CLIENT_ERROR invalid arguments\r\n" => "INVALID_ARGUMENTS", 682 "SERVER_ERROR not my vbucket\r\n" => "NOT_MY_VBUCKET", 683 "SERVER_ERROR out of memory\r\n" => "SERVER_OUT_OF_MEMORY", 684 "SERVER_ERROR not supported\r\n" => "NOT_SUPPORTED", 685 "SERVER_ERROR internal\r\n" => "INTERNAL_ERROR", 686 "SERVER_ERROR temporary failure\r\n" => "SERVER_TEMPORARY_ERROR" 687 ); 688 $self->{error} = $error_message{$reply} || "OK"; 689 return 0; 690} 691 692# ----------------------------------------------------------------------- 693# ------------------ BINARY PROTOCOL -------------------- 694# ----------------------------------------------------------------------- 695 696package My::Memcache::Binary; 697BEGIN { @My::Memcache::Binary::ISA = qw(My::Memcache); } 698use constant BINARY_HEADER_FMT => "CCnCCnNNNN"; 699use constant BINARY_REQUEST => 0x80; 700use constant BINARY_RESPONSE => 0x81; 701 702use constant BIN_CMD_GET => 0x00; 703use constant BIN_CMD_SET => 0x01; 704use constant BIN_CMD_ADD => 0x02; 705use constant BIN_CMD_REPLACE => 0x03; 706use constant BIN_CMD_DELETE => 0x04; 707use constant BIN_CMD_INCR => 0x05; 708use constant BIN_CMD_DECR => 0x06; 709use constant BIN_CMD_QUIT => 0x07; 710use constant BIN_CMD_FLUSH => 0x08; 711use constant BIN_CMD_NOOP => 0x0A; 712use constant BIN_CMD_GETK => 0x0C; 713use constant BIN_CMD_GETKQ => 0x0D; 714use constant BIN_CMD_APPEND => 0x0E; 715use constant BIN_CMD_PREPEND => 0x0F; 716use constant BIN_CMD_STAT => 0x10; 717 718 719sub protocol { 720 return "binary"; 721} 722 723 724sub error_message { 725 my ($self, $code) = @_; 726 my %error_messages = ( 727 0x00 => "OK", 728 0x01 => "NOT_FOUND", 729 0x02 => "KEY_EXISTS", 730 0x03 => "VALUE_TOO_LARGE", 731 0x04 => "INVALID_ARGUMENTS", 732 0x05 => "NOT_STORED", 733 0x06 => "NON_NUMERIC_VALUE", 734 0x07 => "NOT_MY_VBUCKET", 735 0x81 => "UNKNOWN_COMMAND", 736 0x82 => "SERVER_OUT_OF_MEMORY", 737 0x83 => "NOT_SUPPORTED", 738 0x84 => "INTERNAL_ERROR", 739 0x85 => "SERVER_BUSY", 740 0x86 => "SERVER_TEMPORARY_ERROR", 741 0x100 => "PROTOCOL_ERROR", 742 0x101 => "NETWORK_ERROR" 743 ); 744 return $error_messages{$code}; 745} 746 747# Returns true on success, false on error 748sub send_binary_request { 749 my $self = shift; 750 my ($cmd, $key, $val, $extra_header, $cas) = @_; 751 752 $cas = 0 unless $cas; 753 my $key_len = length($key); 754 my $val_len = length($val); 755 my $extra_len = length($extra_header); 756 my $total_len = $key_len + $val_len + $extra_len; 757 my $cas_hi = ($cas >> 32) & 0xFFFFFFFF; 758 my $cas_lo = ($cas & 0xFFFFFFFF); 759 760 $self->new_request(); 761 762 my $header = pack(BINARY_HEADER_FMT, BINARY_REQUEST, $cmd, 763 $key_len, $extra_len, 0, 0, $total_len, 764 $self->{req_id}, $cas_hi, $cas_lo); 765 my $packet = $header . $extra_header . $key . $val; 766 767 return $self->write($packet); 768} 769 770 771sub get_binary_response { 772 my $self = shift; 773 my $header_len = length(pack(BINARY_HEADER_FMT)); 774 my $header; 775 my $body; 776 777 $header = $self->read_known_length($header_len); 778 return (0x101) if(! defined($header)); 779 780 my ($magic, $cmd, $key_len, $extra_len, $datatype, $status, $body_len, 781 $sequence, $cas_hi, $cas_lo) = unpack(BINARY_HEADER_FMT, $header); 782 783 if($magic != BINARY_RESPONSE) { 784 $self->{error_detail} = "Magic number in response: $magic"; 785 return (0x100); 786 } 787 788 $body = $self->read_known_length($body_len); 789 $self->{error} = $self->error_message($status); 790 791 # Packet structure is: header .. extras .. key .. value 792 my $cas = ($cas_hi * (2 ** 32)) + $cas_lo; 793 my $l = $extra_len + $key_len; 794 my $extras = substr $body, 0, $extra_len; 795 my $key = substr $body, $extra_len, $key_len; 796 my $value = substr $body, $l, $body_len - $l; 797 798 return ($status, $value, $key, $extras, $cas, $sequence); 799} 800 801 802sub binary_command { 803 my $self = shift; 804 my ($cmd, $key, $value, $extra_header, $cas) = @_; 805 my $waitTime = $self->{min_wait}; 806 my $maxWait = $self->{max_wait}; 807 my $status; 808 my $wr; 809 810 do { 811 $wr = $self->send_binary_request($cmd, $key, $value, $extra_header, $cas); 812 return undef unless $wr; 813 ($status) = $self->get_binary_response(); 814 if($status == 0x86 && $waitTime < $maxWait) { 815 $self->{temp_errors} += 1; 816 $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000); 817 $waitTime *= 2; 818 } 819 } while($status == 0x86 && $waitTime <= $maxWait); 820 821 return ($status == 0) ? 1 : undef; 822} 823 824 825sub bin_math { 826 my $self = shift; 827 my ($cmd, $key, $delta, $initial) = @_; 828 my $expires = 0xffffffff; # 0xffffffff means the create flag is NOT set 829 if(defined($initial)) { $expires = $self->{exptime}; } 830 else { $initial = 0; } 831 my $value = undef; 832 833 my $extra_header = pack "NNNNN", 834 ($delta / (2 ** 32)), # delta hi 835 ($delta % (2 ** 32)), # delta lo 836 ($initial / (2 ** 32)), # initial hi 837 ($initial % (2 ** 32)), # initial lo 838 $expires; 839 if( $self->send_binary_request($cmd, $key, '', $extra_header)) { 840 my ($status, $packed_val) = $self->get_binary_response(); 841 if($status == 0) { 842 my ($val_hi, $val_lo) = unpack("NN", $packed_val); 843 $value = ($val_hi * (2 ** 32)) + $val_lo; 844 } 845 } 846 return $value; 847} 848 849 850sub bin_store { 851 my ($self, $cmd, $key, $value, $flags, $exptime, $cas) = @_; 852 $flags = $self->{flags} unless $flags; 853 $exptime = $self->{exptime} unless $exptime; 854 my $extra_header = pack "NN", $flags, $exptime; 855 856 return $self->binary_command($cmd, $key, $value, $extra_header, $cas); 857} 858 859## Pipelined multi-get 860sub get { 861 my $self = shift; 862 my $idx = $#_; # Index of the final key 863 my $cmd = BIN_CMD_GETKQ; # GET + KEY + NOREPLY 864 my $wr; 865 my $sequence = 0; 866 my @results; 867 868 for(my $i = 0 ; $i <= $idx ; $i++) { 869 $cmd = BIN_CMD_GETK if($i == $idx); # Final request gets replies 870 $wr = $self->send_binary_request($cmd, $_[$i], '', ''); 871 } 872 return undef unless $wr; 873 874 while($sequence < $self->{req_id}) { 875 my ($status, $value, $key, $extra, $cas); 876 ($status, $value, $key, $extra, $cas, $sequence) = $self->get_binary_response(); 877 return undef if($status > 0x01); 878 if($status == 0) { 879 my $result = My::Memcache::Result->new($key, unpack("N", $extra), $cas); 880 $result->{value} = $value; 881 push @results, $result; 882 } 883 } 884 $self->{get_results} = \@results; 885 if(@results) { 886 $self->{error} = "OK"; 887 return $results[0]->{value}; 888 } 889 $self->{error} = "NOT_FOUND"; 890 return undef; 891} 892 893 894sub stats { 895 my $self = shift; 896 my $key = shift; 897 my %response, my $status, my $value, my $klen, my $tlen; 898 899 $self->send_binary_request(BIN_CMD_STAT, $key, '', ''); 900 do { 901 ($status, $value, $key) = $self->get_binary_response(); 902 if($status == 0) { 903 $response{$key} = $value; 904 } 905 } while($status == 0 && $key); 906 907 return %response; 908} 909 910sub flush { 911 my ($self, $key, $value) = @_; 912 $self->send_binary_request(BIN_CMD_FLUSH, $key, '', ''); 913 my ($status, $result) = $self->get_binary_response(); 914 return ($status == 0) ? 1 : 0; 915} 916 917sub store { 918 my ($self, $cmd, $key, $value, $flags, $exptime, $cas) = @_; 919 my %cmd_map = ( 920 "set" => BIN_CMD_SET , "add" => BIN_CMD_ADD , "replace" => BIN_CMD_REPLACE , 921 "append" => BIN_CMD_APPEND , "prepend" => BIN_CMD_PREPEND 922 ); 923 return $self->bin_store($cmd_map{$cmd}, $key, $value, $flags, $exptime, $cas); 924} 925 926sub set { 927 my ($self, $key, $value) = @_; 928 return $self->bin_store(BIN_CMD_SET, $key, $value); 929} 930 931sub add { 932 my ($self, $key, $value) = @_; 933 return $self->bin_store(BIN_CMD_ADD, $key, $value); 934} 935 936sub replace { 937 my ($self, $key, $value) = @_; 938 return $self->bin_store(BIN_CMD_REPLACE, $key, $value); 939} 940 941sub append { 942 my ($self, $key, $value) = @_; 943 return $self->binary_command(BIN_CMD_APPEND, $key, $value, ''); 944} 945 946sub prepend { 947 my ($self, $key, $value) = @_; 948 return $self->binary_command(BIN_CMD_PREPEND, $key, $value, ''); 949} 950 951sub delete { 952 my ($self, $key) = @_; 953 return $self->binary_command(BIN_CMD_DELETE, $key, '', ''); 954} 955 956sub incr { 957 my ($self, $key, $delta, $initial) = @_; 958 return $self->bin_math(BIN_CMD_INCR, $key, $delta, $initial); 959} 960 961sub decr { 962 my ($self, $key, $delta, $initial) = @_; 963 return $self->bin_math(BIN_CMD_DECR, $key, $delta, $initial); 964} 965 966 9671; 968