1# -*- cperl -*-
2# Copyright (c) 2013, 2021, Oracle and/or its affiliates.
3# All rights reserved.
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License, version 2.0,
7# as published by the Free Software Foundation.
8#
9# This program is also distributed with certain software (including
10# but not limited to OpenSSL) that is licensed under separate terms,
11# as designated in a particular file or component or in included license
12# documentation.  The authors of MySQL hereby grant you an additional
13# permission to link the program and your derivative works with the
14# separately licensed software that they have included with MySQL.
15#
16# This program is distributed in the hope that it will be useful,
17# but WITHOUT ANY WARRANTY; without even the implied warranty of
18# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19# GNU General Public License, version 2.0, for more details.
20#
21# You should have received a copy of the GNU General Public License
22# along with this program; if not, write to the Free Software
23# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA
24
25
26########## Memcache Client Library for Perl
27###
28###  $mc = My::Memcache->new()          create an ascii-protocol client
29###  $mc = My::Memcache::Binary->new()  create a binary-protocol client
30###
31###  $mc->connect(host, port)           returns 1 on success, 0 on failure
32###
33###  $mc->{error}                       holds most recent error/status message
34###
35###  $mc->store(cmd, key, value, ...)   alternate API for set/add/replace/append/prepend
36###  $mc->set(key, value)               returns 1 on success, 0 on failure
37###  $mc->add(key, value)               set if record does not exist
38###  $mc->replace(key, value)           set if record exists
39###  $mc->append(key, value)            append value to existing data
40###  $mc->prepend(key, value)           prepend value to existing data
41###
42###  $mc->get(key, [ key ...])          returns a value or undef
43###  $mc->next_result()                 Fetch results after get()
44###
45###  $mc->delete(key)                   returns 1 on success, 0 on failure
46###  $mc->stats(stat_key)               get stats; returns a hash
47###  $mc->incr(key, amount, [initial])  returns the new value or undef
48###  $mc->decr(key, amount, [initial])  like incr.
49###                                           The third argument is used in
50###                                           the binary protocol ONLY.
51###  $mc->flush()                       flush_all
52###
53###  $mc->set_expires(sec)              Set TTL for all store operations
54###  $mc->set_flags(int_flags)          Set numeric flags for store operations
55###
56###  $mc->note_config_version()
57###    Store the generation number of the running config in the filesystem,
58###    for later use by wait_for_reconf()
59###
60###  $mc->wait_for_reconf()
61###    Wait for NDB/Memcache to complete online reconfiguration.
62###    Returns the generation number of the newly running configuration,
63###    or zero on timeout/error.
64
65################ TODO ################################
66###  * Support explicit binary k/q commands with pipelining
67###  * Implement TOUCH & GAT commands
68###  * Support UDP
69###  * Standardize APIs to take (key, value, hashref-of-options)
70
71use strict;
72use IO::Socket::INET;
73use IO::File;
74use Carp;
75use Time::HiRes;
76use Errno qw( EWOULDBLOCK );
77
78######## Memcache Result
79
80package My::Memcache::Result;
81
82sub new {
83  my ($pkg, $key, $flags, $cas) = @_;
84  $cas = 0 if(! defined($cas));
85  bless {
86    "key" => $key,
87    "flags" => $flags,
88    "cas" => $cas,
89    "value" => undef,
90  }, $pkg;
91}
92
93
94######## Memcache Client
95
96package My::Memcache;
97
98sub new {
99  my $pkg = shift;
100  # min/max wait refer to msec. wait during temporary errors.  Both powers of 2.
101  # io_timeout is in seconds (possibly fractional)
102  # io_timeout * max_read_tries = read timeout
103  bless { "created" => 1 , "error" => "OK" , "cf_gen" => 0, "req_id" => 0,
104          "min_wait" => 4,  "max_wait" => 8192, "temp_errors" => 0 ,
105          "total_wait" => 0, "has_cas" => 0, "flags" => 0, "exptime" => 0,
106          "get_results" => undef, "get_with_cas" => 0, "failed" => 0,
107          "io_timeout" => 5.0, "sysread_size" => 512, "max_read_tries" => 6,
108          "readbuf" => "", "buflen" => 0, "error_detail" => "", "read_try" => 0,
109          "max_write_tries" => 6
110        }, $pkg;
111}
112
113
114sub summarize {
115  my $val = shift;
116  my $len = length $val;
117
118  if($len > 25) {
119    return substr($val,0,10) . "..." . substr($val,-10) . " [len $len]";
120  } else {
121    return $val;
122  }
123}
124
125
126# fail() is called when an MTR test fails
127sub fail {
128  my $self = shift;
129  my $fd;
130
131  if($self->{failed}) {
132    print STDERR " /// My::Memcache::fail() called recursively.\n";
133    return;
134  }
135  $self->{failed} = 1;
136
137  my $msg =
138      "error: "         . $self->{error}       ."\t".
139      "read_try: "      . $self->{read_try}    ."\t".
140      "protocol: "      . $self->protocol()    ."\n".
141      "req_id: "        . $self->{req_id}      ."\t".
142      "temp err wait: " . $self->{total_wait}  ." msec.\n";
143
144  $msg .= "detail: " . $self->{error_detail} . "\n";
145  $msg .= "buffer: " . summarize($self->{readbuf}) . "\n";
146
147  my $r = $self->next_result();
148  $msg .= "value: " . summarize($r->{value}) . "\n" if($r);
149
150  while(my $extra = shift) {
151    $msg .= $extra;
152  }
153  $msg .= "\n";
154
155  $msg .= $self->get_server_error_stats();
156
157  # Load Average on linux
158  $msg .= ("Load Avg: " . <$fd>) if(open($fd, "/proc/loadavg"));
159
160  $msg .= "====~~~~____~~~~====\n";
161
162  Carp::confess($msg);
163}
164
165
166# Attempt a new connection to memcached to flush the server's error log
167# and obtain error statistics
168
169sub get_server_error_stats {
170  my $self = shift;
171  my $new_client = My::Memcache::Binary->new();
172  my $r = $new_client->connect($self->{host}, $self->{port});
173  my $msg = "";
174
175  if($r) {
176    my %stats = $new_client->stats("errors");  # also flushes server log
177    $msg .= "Server error stats:\n";
178    $msg .= sprintf("%s : %s\n", $_, $stats{$_}) for keys(%stats);
179  } else {
180    $msg = "Attempted new server connection to fetch error statistics but failed.\n";
181  }
182  return $msg;
183}
184
185
186# Common code to ASCII and BINARY protocols:
187
188sub connect {
189  my $self = shift;
190  my $host = shift;
191  my $port = shift;
192  my $conn;
193
194  # Wait for memcached to be ready, up to ten seconds.
195  my $retries = 100;
196  do {
197    $conn = IO::Socket::INET->new(PeerAddr => "$host:$port", Proto => "tcp");
198    if(! $conn) {
199      Time::HiRes::usleep(100 * 1000);
200      $retries--;
201    }
202  } while($retries && !$conn);
203
204  if($conn) {
205    $conn->blocking(0);    # Set non-blocking
206    my $fd = fileno $conn;
207    my $fdset = '';
208    vec($fdset, $fd, 1) = 1;
209    $self->{fdset} = $fdset;
210    $self->{connection} = $conn;
211    $self->{host} = $host;
212    $self->{port} = $port;
213    return 1;
214  }
215  $self->{error} = "CONNECTION_FAILED";
216  return 0;
217}
218
219sub DESTROY {
220  my $self = shift;
221  if($self->{connection}) {
222    $self->{connection}->close();
223  }
224}
225
226sub set_expires {
227  my $self = shift;
228  $self->{exptime} = shift;
229}
230
231sub set_flags {
232  my $self = shift;
233  $self->{flags} = shift;
234}
235
236# Some member variables are per-request.
237# Clear them in preparation for a new request, and increment the request counter.
238sub new_request {
239  my $self = shift;
240  $self->{error} = "OK";
241  $self->{read_try} = 0;
242  $self->{has_cas} = 0;
243  $self->{req_id}++;
244  $self->{get_results} = undef;
245}
246
247sub next_result {
248  my $self = shift;
249  shift @{$self->{get_results}};
250}
251
252# note_config_version and wait_for_reconf are only for use by mysql-test-run
253sub note_config_version {
254  my $self = shift;
255
256  my $vardir = $ENV{MYSQLTEST_VARDIR};
257  # Fetch the memcached current config generation number and save it
258  my %stats = $self->stats("reconf");
259  my $F = IO::File->new("$vardir/tmp/memcache_cf_gen", "w") or die;
260  my $ver = $stats{"Running"};
261  print $F "$ver\n";
262  $F->close();
263
264  $self->{cf_gen} = $ver;
265}
266
267sub wait_for_reconf {
268  my $self = shift;
269
270  if($self->{cf_gen} == 0) {
271    my $cfgen = 0;
272    my $vardir = $ENV{MYSQLTEST_VARDIR};
273    my $F = IO::File->new("$vardir/tmp/memcache_cf_gen", "r");
274    if(defined $F) {
275      chomp($cfgen = <$F>);
276      undef $F;
277    }
278    $self->{cf_gen} = $cfgen;
279  }
280
281  my $wait_for = $self->{cf_gen} + 1 ;
282
283  my $new_gen = $self->wait_for_config_generation($wait_for);
284  if($new_gen > 0) {
285    $self->{cf_gen} = $new_gen;
286  }
287  else {
288    print STDERR "Wait for config generation $wait_for timed out.\n";
289  }
290
291  return $new_gen;
292}
293
294# wait_for_config_generation($cf_gen)
295# Wait until memcached is running config generation >= to $cf_gen
296# Returns 0 on error/timeout, or the actual running generation number
297#
298sub wait_for_config_generation {
299  my $self = shift;
300  my $cf_gen = shift;
301  my $ready = 0;
302  my $retries = 100;   # 100 retries x 100 ms = 10s
303
304  while($retries && ! $ready) {
305    Time::HiRes::usleep(100 * 1000);
306    my %stats = $self->stats("reconf");
307    if($stats{"Running"} >= $cf_gen) {
308      $ready = $stats{"Running"};
309    }
310    else {
311      $retries -= 1;
312    }
313  }
314  return $ready;
315}
316
317#  -----------------------------------------------------------------------
318#  --------------        Low-level Network Handling        ---------------
319#  -----------------------------------------------------------------------
320
321# Utility function sets error based on network error & returns false.
322sub socket_error {
323  my $self = shift;
324  my $retval = shift;
325  my $detail = shift;
326
327  if($retval == 1) {
328    $self->{error} = "CONNECTION_CLOSED";
329  } elsif($retval == 0) {
330    $self->{error} = "NETWORK_TIMEOUT";
331  } else {
332    $self->{error} = "NETWORK_ERROR: " . $!;
333  }
334  $self->{error_detail} = $detail if($detail);
335
336  return 0;
337}
338
339# $mc->write(packet).  Returns true on success, false on error.
340sub write {
341  my $self = shift;
342  my $packet = shift;
343  my $len = length($packet);
344  my $nsent = 0;
345  my $attempt = 0;
346  my $r;
347
348  if(! $self->{connection}->connected()) {
349    return $self->socket_error(0, "write(): not connected");
350  }
351
352  while($nsent < $len) {
353    $r = select(undef, $self->{fdset}, undef, $self->{io_timeout});
354    if($r < 1) {
355      if(++$attempt >= $self->{max_write_tries}) {
356        return $self->socket_error($r, "write(): select() returned $r");
357      }
358    } else {
359      $r = $self->{connection}->send(substr($packet, $nsent));
360      if($r > 0) {
361        $nsent += $r;
362      } elsif($! != Errno::EWOULDBLOCK) {
363        return $self->socket_error($r, "write(): send() errno $!");
364      }
365    }
366  }
367  return 1;
368}
369
370# $mc->read(desired_size).  Low-level read.  Returns true on success,
371# appends data to readbuf, and sets buflen.  Returns false on error.
372sub read {
373  my $self = shift;
374  my $length = shift;
375  my $sock = $self->{connection};
376  my $r;
377
378  if($length > 0) {
379    $r = select($self->{fdset}, undef, undef, $self->{io_timeout});
380    return $self->socket_error($r, "read(): select() $!") if($r < 0);
381
382    $r = $sock->sysread($self->{readbuf}, $length, $self->{buflen});
383    if($r > 0) {
384      $self->{buflen} += $r;
385    } elsif($r < 0 && $! != Errno::EWOULDBLOCK) {
386      return $self->socket_error( $r == 0 ? 1 : $r, "read(): sysread() $!");
387    }
388  }
389  return 1;
390}
391
392# Utility routine; assumes $len is available on buffer.
393sub chop_from_buffer {
394  my $self = shift;
395  my $len = shift;
396
397  my $line = substr($self->{readbuf}, 0, $len);
398  $self->{readbuf} = substr($self->{readbuf}, $len);
399  $self->{buflen} -= $len;
400  return $line;
401}
402
403# Returns a line if available; otherwise undef
404sub get_line_from_buffer {
405  my $self = shift;
406  my $line = undef;
407
408  my $idx = index($self->{readbuf}, "\r\n");
409  if($idx >= 0) {
410    $line = $self->chop_from_buffer($idx + 2);  # 2 for \r\n
411  }
412  return $line;
413}
414
415# Returns length if available; otherwise undef
416sub get_length_from_buffer {
417  my $self = shift;
418  my $len = shift;
419
420  if($self->{buflen} >= $len) {
421    return $self->chop_from_buffer($len);
422  }
423  return undef;
424}
425
426# Read up to newline.  Returns a line, or sets and returns error.
427sub read_line {
428  my $self = shift;
429  my $message;
430
431  $self->{read_try} = 0;
432  while((! defined($message)) && $self->{read_try} < $self->{max_read_tries}) {
433    $self->{read_try}++;
434    $message = $self->get_line_from_buffer();
435    if(! defined($message)) {
436      if(! $self->read($self->{sysread_size})) {
437        return $self->{error};
438      }
439    }
440  }
441  if(defined($message)) {
442    $self->normalize_error($message);  # handle server error responses
443    return $message;
444  }
445
446  $self->socket_error(0, "read_line(): timeout");
447  return $self->{error};
448}
449
450# Read <length> bytes.  Returns the data, or returns undef and sets error.
451sub read_known_length {
452  my $self = shift;
453  my $len = shift;
454  my $data;
455
456  $self->{read_try} = 0;
457  while($self->{read_try} < $self->{max_read_tries}) {
458    $self->{read_try}++;
459    $data = $self->get_length_from_buffer($len);
460    return $data if(defined($data));
461    if(! $self->read($len - $self->{buflen})) {
462      return undef;
463    }
464  }
465  # Perhaps the read completed on the final attempt
466  $data = $self->get_length_from_buffer($len);
467  if(! defined($data)) {
468    $self->socket_error(0, "read_known_length(): timeout");
469  }
470  return $data;
471}
472
473
474#  -----------------------------------------------------------------------
475#  ------------------          ASCII PROTOCOL         --------------------
476#  -----------------------------------------------------------------------
477
478sub protocol {
479  return "ascii";
480}
481
482sub protocol_error {
483  my $self = shift;
484  my $detail = shift;
485
486  if($self->{error} eq "OK") {
487    $self->{error} = "PROTOCOL_ERROR";
488  }
489  if($detail) {
490    $self->{error_detail} = $detail;
491  }
492  return undef;
493}
494
495sub ascii_command {
496  my $self = shift;
497  my $packet = shift;
498  my $waitTime = $self->{min_wait};
499  my $maxWait = $self->{max_wait};
500  my $reply;
501
502  do {
503    $self->new_request();
504    $self->write($packet);
505    $reply = $self->read_line();
506    if($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime < $maxWait) {
507      $self->{temp_errors} += 1;
508      $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000);
509      $waitTime *= 2;
510    }
511  } while($self->{error} eq "SERVER_TEMPORARY_ERROR" && $waitTime <= $maxWait);
512
513  return $reply;
514}
515
516
517sub delete {
518  my $self = shift;
519  my $key = shift;
520  my $response = $self->ascii_command("delete $key\r\n");
521  return 1 if($response =~ "^DELETED");
522  return 0 if($response =~ "^NOT_FOUND");
523  return 0 if($response =~ "^SERVER_ERROR");
524  return $self->protocol_error("delete() got response: $response");
525}
526
527
528sub store {
529  my ($self, $cmd, $key, $value, $flags, $exptime, $cas_chk) = @_;
530  $flags   = $self->{flags}   unless $flags;
531  $exptime = $self->{exptime} unless $exptime;
532  my $packet;
533  if(($cmd eq "cas" || $cmd eq "replace") && $cas_chk > 0)
534  {
535    $packet = sprintf("cas %s %d %d %d %d\r\n%s\r\n",
536                      $key, $flags, $exptime, $cas_chk, length($value), $value);
537  }
538  else
539  {
540    $packet = sprintf("%s %s %d %d %d\r\n%s\r\n",
541                      $cmd, $key, $flags, $exptime, length($value), $value);
542  }
543  my $response = $self->ascii_command($packet);
544  return 1 if($response =~ "^STORED");
545  return 0 if($response =~ "^NOT_STORED");
546  return 0 if($response =~ "^EXISTS");
547  return 0 if($response =~ "^NOT_FOUND");
548  return 0 if($response =~ "^SERVER_ERROR");
549  return $self->protocol_error("store() got response: $response");
550}
551
552sub set {
553  my ($self, $key, $value, $flags, $exptime) = @_;
554  return $self->store("set", $key, $value, $flags, $exptime);
555}
556
557
558sub add {
559  my ($self, $key, $value, $flags, $exptime) = @_;
560  return $self->store("add", $key, $value, $flags, $exptime);
561}
562
563
564sub append {
565  my ($self, $key, $value, $flags, $exptime) = @_;
566  return $self->store("append", $key, $value, $flags, $exptime);
567}
568
569
570sub prepend {
571  my ($self, $key, $value, $flags, $exptime) = @_;
572  return $self->store("prepend", $key, $value, $flags, $exptime);
573}
574
575
576sub replace {
577  my ($self, $key, $value, $flags, $exptime, $cas) = @_;
578  return $self->store("replace", $key, $value, $flags, $exptime, $cas);
579}
580
581
582sub get {
583  my $self = shift;
584  my @results;
585  my $keys = "";
586  $keys .= shift(@_) . " " while(@_);
587  my $command = $self->{get_with_cas} ? "gets" : "get";
588  $self->{get_with_cas} = 0; # CHECK, THEN RESET FOR NEXT CALL
589  my $response = $self->ascii_command("$command $keys\r\n");
590  return undef if($self->{error} ne "OK");
591  while ($response ne "END\r\n")
592  {
593    $response =~ /^VALUE (\S+) (\d+) (\d+) ?(\d+)?/;
594    if(! (defined($1) && defined($2) && defined($3))) {
595      return $self->protocol_error("GET response: $response");
596    }
597    my $result = My::Memcache::Result->new($1, $2, $4);
598    my $value = $self->read_known_length($3);
599    return undef if(!defined($value));
600    $result->{value} = $value;
601    $self->read_line();  # Get trailing \r\n after value
602    $self->{has_cas} = 1 if($4);
603    push @results, $result;
604    $response = $self->read_line();
605  }
606  $self->{get_results} = \@results;
607  return $results[0]->{value} if @results;
608  $self->{error} = "NOT_FOUND";
609  return undef;
610}
611
612
613sub _txt_math {
614  my ($self, $cmd, $key, $delta) = @_;
615  my $response = $self->ascii_command("$cmd $key $delta \r\n");
616
617  if ($response =~ "^NOT_FOUND" || $response =~ "ERROR") {
618    return undef;
619  }
620
621  $response =~ /(\d+)/;
622  return $self->protocol_error("MATH response: $response") unless defined($1);
623  return $1;
624}
625
626
627sub incr {
628  my ($self, $key, $delta) = @_;
629  return $self->_txt_math("incr", $key, $delta);
630}
631
632
633sub decr {
634  my ($self, $key, $delta) = @_;
635  return $self->_txt_math("decr", $key, $delta);
636}
637
638
639sub stats {
640  my $self = shift;
641  my $key = shift || "";
642
643  $self->new_request();
644  $self->write("stats $key\r\n");
645
646  my %response = ();
647  my $line = $self->read_line();
648  while($line !~ "^END") {
649    return %response if $line eq "ERROR\r\n";
650    if(($line) && ($line =~ /^STAT(\s+)(\S+)(\s+)(\S+)/)) {
651      $response{$2} = $4;
652    } else {
653      return $self->protocol_error("STATS response line: $line");
654    }
655    $line = $self->read_line();
656  }
657
658  return %response;
659}
660
661
662sub flush {
663  my $self = shift;
664  my $key = shift;
665  my $result = $self->ascii_command("flush_all\r\n");
666  return ($self->{error} eq "OK");
667}
668
669
670# Try to provide consistent error messagees across ascii & binary protocols
671sub normalize_error {
672  my $self = shift;
673  my $reply = shift;
674  my %error_message = (
675  "STORED\r\n"                         => "OK",
676  "EXISTS\r\n"                         => "KEY_EXISTS",
677  "NOT_FOUND\r\n"                      => "NOT_FOUND",
678  "NOT_STORED\r\n"                     => "NOT_STORED",
679  "CLIENT_ERROR value too big\r\n"     => "VALUE_TOO_LARGE",
680  "SERVER_ERROR object too large for cache\r\n"     => "VALUE_TOO_LARGE",
681  "CLIENT_ERROR invalid arguments\r\n" => "INVALID_ARGUMENTS",
682  "SERVER_ERROR not my vbucket\r\n"    => "NOT_MY_VBUCKET",
683  "SERVER_ERROR out of memory\r\n"     => "SERVER_OUT_OF_MEMORY",
684  "SERVER_ERROR not supported\r\n"     => "NOT_SUPPORTED",
685  "SERVER_ERROR internal\r\n"          => "INTERNAL_ERROR",
686  "SERVER_ERROR temporary failure\r\n" => "SERVER_TEMPORARY_ERROR"
687  );
688  $self->{error} = $error_message{$reply} || "OK";
689  return 0;
690}
691
692#  -----------------------------------------------------------------------
693#  ------------------         BINARY PROTOCOL         --------------------
694#  -----------------------------------------------------------------------
695
696package My::Memcache::Binary;
697BEGIN { @My::Memcache::Binary::ISA = qw(My::Memcache); }
698use constant BINARY_HEADER_FMT  => "CCnCCnNNNN";
699use constant BINARY_REQUEST     => 0x80;
700use constant BINARY_RESPONSE    => 0x81;
701
702use constant BIN_CMD_GET        => 0x00;
703use constant BIN_CMD_SET        => 0x01;
704use constant BIN_CMD_ADD        => 0x02;
705use constant BIN_CMD_REPLACE    => 0x03;
706use constant BIN_CMD_DELETE     => 0x04;
707use constant BIN_CMD_INCR       => 0x05;
708use constant BIN_CMD_DECR       => 0x06;
709use constant BIN_CMD_QUIT       => 0x07;
710use constant BIN_CMD_FLUSH      => 0x08;
711use constant BIN_CMD_NOOP       => 0x0A;
712use constant BIN_CMD_GETK       => 0x0C;
713use constant BIN_CMD_GETKQ      => 0x0D;
714use constant BIN_CMD_APPEND     => 0x0E;
715use constant BIN_CMD_PREPEND    => 0x0F;
716use constant BIN_CMD_STAT       => 0x10;
717
718
719sub protocol {
720  return "binary";
721}
722
723
724sub error_message {
725  my ($self, $code) = @_;
726  my %error_messages = (
727   0x00 => "OK",
728   0x01 => "NOT_FOUND",
729   0x02 => "KEY_EXISTS",
730   0x03 => "VALUE_TOO_LARGE",
731   0x04 => "INVALID_ARGUMENTS",
732   0x05 => "NOT_STORED",
733   0x06 => "NON_NUMERIC_VALUE",
734   0x07 => "NOT_MY_VBUCKET",
735   0x81 => "UNKNOWN_COMMAND",
736   0x82 => "SERVER_OUT_OF_MEMORY",
737   0x83 => "NOT_SUPPORTED",
738   0x84 => "INTERNAL_ERROR",
739   0x85 => "SERVER_BUSY",
740   0x86 => "SERVER_TEMPORARY_ERROR",
741   0x100 => "PROTOCOL_ERROR",
742   0x101 => "NETWORK_ERROR"
743  );
744  return $error_messages{$code};
745}
746
747# Returns true on success, false on error
748sub send_binary_request {
749  my $self = shift;
750  my ($cmd, $key, $val, $extra_header, $cas) = @_;
751
752  $cas = 0 unless $cas;
753  my $key_len    = length($key);
754  my $val_len    = length($val);
755  my $extra_len  = length($extra_header);
756  my $total_len  = $key_len + $val_len + $extra_len;
757  my $cas_hi     = ($cas >> 32) & 0xFFFFFFFF;
758  my $cas_lo     = ($cas & 0xFFFFFFFF);
759
760  $self->new_request();
761
762  my $header = pack(BINARY_HEADER_FMT, BINARY_REQUEST, $cmd,
763                    $key_len, $extra_len, 0, 0, $total_len,
764                    $self->{req_id}, $cas_hi, $cas_lo);
765  my $packet = $header . $extra_header . $key . $val;
766
767  return $self->write($packet);
768}
769
770
771sub get_binary_response {
772  my $self = shift;
773  my $header_len = length(pack(BINARY_HEADER_FMT));
774  my $header;
775  my $body;
776
777  $header = $self->read_known_length($header_len);
778  return (0x101) if(! defined($header));
779
780  my ($magic, $cmd, $key_len, $extra_len, $datatype, $status, $body_len,
781      $sequence, $cas_hi, $cas_lo) = unpack(BINARY_HEADER_FMT, $header);
782
783  if($magic != BINARY_RESPONSE) {
784    $self->{error_detail} = "Magic number in response: $magic";
785    return (0x100);
786  }
787
788  $body = $self->read_known_length($body_len);
789  $self->{error} = $self->error_message($status);
790
791  # Packet structure is: header .. extras .. key .. value
792  my $cas = ($cas_hi * (2 ** 32)) + $cas_lo;
793  my $l = $extra_len + $key_len;
794  my $extras = substr $body, 0, $extra_len;
795  my $key    = substr $body, $extra_len, $key_len;
796  my $value  = substr $body, $l, $body_len - $l;
797
798  return ($status, $value, $key, $extras, $cas, $sequence);
799}
800
801
802sub binary_command {
803  my $self = shift;
804  my ($cmd, $key, $value, $extra_header, $cas) = @_;
805  my $waitTime = $self->{min_wait};
806  my $maxWait = $self->{max_wait};
807  my $status;
808  my $wr;
809
810  do {
811    $wr = $self->send_binary_request($cmd, $key, $value, $extra_header, $cas);
812    return undef unless $wr;
813    ($status) = $self->get_binary_response();
814    if($status == 0x86 && $waitTime < $maxWait) {
815      $self->{temp_errors} += 1;
816      $self->{total_wait} += ( Time::HiRes::usleep($waitTime * 1000) / 1000);
817      $waitTime *= 2;
818    }
819  } while($status == 0x86 && $waitTime <= $maxWait);
820
821  return ($status == 0) ? 1 : undef;
822}
823
824
825sub bin_math {
826  my $self = shift;
827  my ($cmd, $key, $delta, $initial) = @_;
828  my $expires = 0xffffffff;  # 0xffffffff means the create flag is NOT set
829  if(defined($initial))  { $expires = $self->{exptime};   }
830  else                   { $initial = 0;                  }
831  my $value = undef;
832
833  my $extra_header = pack "NNNNN",
834  ($delta   / (2 ** 32)),   # delta hi
835  ($delta   % (2 ** 32)),   # delta lo
836  ($initial / (2 ** 32)),   # initial hi
837  ($initial % (2 ** 32)),   # initial lo
838  $expires;
839  if( $self->send_binary_request($cmd, $key, '', $extra_header)) {
840    my ($status, $packed_val) = $self->get_binary_response();
841    if($status == 0) {
842      my ($val_hi, $val_lo) = unpack("NN", $packed_val);
843      $value = ($val_hi * (2 ** 32)) + $val_lo;
844    }
845  }
846  return $value;
847}
848
849
850sub bin_store {
851  my ($self, $cmd, $key, $value, $flags, $exptime, $cas) = @_;
852  $flags   = $self->{flags}   unless $flags;
853  $exptime = $self->{exptime} unless $exptime;
854  my $extra_header = pack "NN", $flags, $exptime;
855
856  return $self->binary_command($cmd, $key, $value, $extra_header, $cas);
857}
858
859## Pipelined multi-get
860sub get {
861  my $self = shift;
862  my $idx = $#_;   # Index of the final key
863  my $cmd = BIN_CMD_GETKQ;  # GET + KEY + NOREPLY
864  my $wr;
865  my $sequence = 0;
866  my @results;
867
868  for(my $i = 0 ; $i <= $idx ; $i++) {
869    $cmd = BIN_CMD_GETK if($i == $idx);  # Final request gets replies
870    $wr = $self->send_binary_request($cmd, $_[$i], '', '');
871  }
872  return undef unless $wr;
873
874  while($sequence < $self->{req_id}) {
875    my ($status, $value, $key, $extra, $cas);
876    ($status, $value, $key, $extra, $cas, $sequence) = $self->get_binary_response();
877    return undef if($status > 0x01);
878    if($status == 0) {
879      my $result = My::Memcache::Result->new($key, unpack("N", $extra), $cas);
880      $result->{value} = $value;
881      push @results, $result;
882    }
883  }
884  $self->{get_results} = \@results;
885  if(@results) {
886    $self->{error} = "OK";
887    return $results[0]->{value};
888  }
889  $self->{error} = "NOT_FOUND";
890  return undef;
891}
892
893
894sub stats {
895  my $self = shift;
896  my $key = shift;
897  my %response, my $status, my $value, my $klen, my $tlen;
898
899  $self->send_binary_request(BIN_CMD_STAT, $key, '', '');
900  do {
901    ($status, $value, $key) = $self->get_binary_response();
902    if($status == 0) {
903      $response{$key} = $value;
904    }
905  } while($status == 0 && $key);
906
907  return %response;
908}
909
910sub flush {
911  my ($self, $key, $value) = @_;
912  $self->send_binary_request(BIN_CMD_FLUSH, $key, '', '');
913  my ($status, $result) = $self->get_binary_response();
914  return ($status == 0) ? 1 : 0;
915}
916
917sub store {
918  my ($self, $cmd, $key, $value, $flags, $exptime, $cas) = @_;
919  my %cmd_map = (
920    "set" => BIN_CMD_SET , "add" => BIN_CMD_ADD , "replace" => BIN_CMD_REPLACE ,
921    "append" => BIN_CMD_APPEND , "prepend" => BIN_CMD_PREPEND
922  );
923  return $self->bin_store($cmd_map{$cmd}, $key, $value, $flags, $exptime, $cas);
924}
925
926sub set {
927  my ($self, $key, $value) = @_;
928  return $self->bin_store(BIN_CMD_SET, $key, $value);
929}
930
931sub add {
932  my ($self, $key, $value) = @_;
933  return $self->bin_store(BIN_CMD_ADD, $key, $value);
934}
935
936sub replace {
937  my ($self, $key, $value) = @_;
938  return $self->bin_store(BIN_CMD_REPLACE, $key, $value);
939}
940
941sub append {
942  my ($self, $key, $value) = @_;
943  return $self->binary_command(BIN_CMD_APPEND, $key, $value, '');
944}
945
946sub prepend {
947  my ($self, $key, $value) = @_;
948  return $self->binary_command(BIN_CMD_PREPEND, $key, $value, '');
949}
950
951sub delete {
952  my ($self, $key) = @_;
953  return $self->binary_command(BIN_CMD_DELETE, $key, '', '');
954}
955
956sub incr {
957  my ($self, $key, $delta, $initial) = @_;
958  return $self->bin_math(BIN_CMD_INCR, $key, $delta, $initial);
959}
960
961sub decr {
962  my ($self, $key, $delta, $initial) = @_;
963  return $self->bin_math(BIN_CMD_DECR, $key, $delta, $initial);
964}
965
966
9671;
968