1# $Id: /mirror/perl/Data-Throttler-Memcached/trunk/lib/Data/Throttler/BucketChain/Memcached.pm 8774 2007-11-08T09:43:20.728908Z daisuke $ 2# 3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp> 4# All rights reserved. 5 6package Data::Throttler::BucketChain::Memcached; 7use strict; 8use warnings; 9use base qw(Class::Accessor::Fast Data::Throttler::BucketChain); 10use Cache::Memcached::Managed; 11use Log::Log4perl qw(:easy); 12 13__PACKAGE__->mk_accessors($_) for qw(id max_items interval cache); 14__PACKAGE__->mk_accessors($_) for qw(buckets bucket_time_span nof_buckets ); 15 16sub new 17{ 18 my $class = shift; 19 my %args = @_; 20 21 my $self = bless { 22 max_items => delete $args{max_items}, 23 interval => delete $args{interval}, 24 nof_buckets => delete $args{nof_buckets}, 25 id => delete $args{id} || do { 26 no warnings; 27 require Digest::MD5; 28 Digest::MD5::md5_hex($$, time(), rand(), {}) 29 } 30 }, $class; 31 32 my $cache = Cache::Memcached::Managed->new( 33 # defaults 34 data => '127.0.0.1:11211', 35 namespace => $class, 36 # user-specified 37 %{ $args{cache} || {} }, 38 # overrides 39 expiration => $self->interval * 2 40 ); 41 $self->cache( $cache ); 42 43 if(!$self->max_items or !$self->interval) { 44 LOGDIE "Both max_items and interval need to be defined"; 45 } 46 47 if(!$self->nof_buckets) { 48 $self->nof_buckets(10); 49 } 50 51 if($self->nof_buckets > $self->interval) { 52 $self->nof_buckets( $self->interval ); 53 } 54 55 $self->reset(); 56 return $self; 57} 58 59sub reset 60{ 61 my $self = shift; 62 63 $self->cache->delete_group( group => $self->id ); 64 $self->buckets([]); 65 66 my $bucket_time_span = int ($self->interval / $self->nof_buckets); 67 $self->bucket_time_span( $bucket_time_span ); 68 69 my $time_start = time() - ($self->nof_buckets - 1) * $bucket_time_span; 70 71 for(1..$self->nof_buckets) { 72 my $time_end = $time_start + $bucket_time_span - 1; 73 DEBUG "Creating bucket ", _hms($time_start), " - ", _hms($time_end); 74 push @{$self->{buckets}}, { 75 time => Data::Throttler::Range->new($time_start, $time_end), 76 id => join('.', $self->id, $time_start, $time_end), 77 count => {}, 78 }; 79 $time_start = $time_end + 1; 80 } 81 82 $self->{head_bucket_idx} = 0; 83 $self->{tail_bucket_idx} = $#{$self->{buckets}}; 84} 85 86sub as_string 87{ 88 my($self) = @_; 89 90 warn "as_string for Data::Throttler::Memcached is currently unimplemented"; 91} 92 93sub _hms { 94 my($time) = @_; 95 96 my ($sec,$min,$hour) = localtime($time); 97 return sprintf "%02d:%02d:%02d", 98 $hour, $min, $sec; 99} 100 101sub bucket_add 102{ 103 my($self, $time) = @_; 104 105 # ... and append a new one at the end 106 my $time_start = $self->{buckets}-> 107 [$self->{tail_bucket_idx}]->{time}->max + 1; 108 my $time_end = $time_start + $self->{bucket_time_span} - 1; 109 110 DEBUG "Adding bucket: ", _hms($time_start), " - ", _hms($time_end); 111 112 $self->{tail_bucket_idx}++; 113 $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} > 114 $#{$self->{buckets}}; 115 $self->{head_bucket_idx}++; 116 $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} > 117 $#{$self->{buckets}}; 118 119 $self->{buckets}->[ $self->{tail_bucket_idx} ] = { 120 time => Data::Throttler::Range->new($time_start, $time_end), 121 id => join('.', $self->id, $time_start, $time_end), 122 count => {}, 123 }; 124} 125 126sub try_push 127{ 128 my($self, %options) = @_; 129 130 my $key = "_default"; 131 $key = $options{key} if defined $options{key}; 132 133 my $time = time(); 134 $time = $options{time} if defined $options{time}; 135 136 my $count = 1; 137 $count = $options{count} if defined $options{count}; 138 139 DEBUG "Trying to push $key ", _hms($time), " $count"; 140 141 my $b = $self->bucket_find($time); 142 143 if(!$b) { 144 $self->rotate($time); 145 $b = $self->bucket_find($time); 146 } 147 148 # Determine the total count for this key 149 my %count = %{ $self->cache->get_multi( 150 id => [ map { [ $key, $_->{id} ] } @{ $self->buckets } ], 151 key => 'count' 152 ) }; 153 my $val = 0; 154 $val += $_ for values %count; 155 156 157 if($val >= $self->{max_items}) { 158 DEBUG "Not increasing counter $key by $count (already at max $val|$self->{max_items})"; 159 return 0; 160 } else { 161 DEBUG "Increasing counter $key by $count ", 162 "($val|$self->{max_items})"; 163 $self->cache->incr( 164 value => 1, 165 id => [ $key, $b->{id} ], 166 key => 'count' 167 ); 168 return 1; 169 } 170 171 LOGDIE "Time $time is outside of bucket range\n", $self->as_string; 172 return undef; 173} 174 1751; 176 177__END__ 178 179=head1 NAME 180 181Data::Throttler::BucketChain::Memcached - Backend Store for Data::Throttler::Memcached 182 183=head1 SYNOPSIS 184 185 # Internal use only 186 187=head1 METHODS 188 189=head2 new 190 191=head2 try_push 192 193=head2 as_string 194 195=head2 bucket_add 196 197=head2 reset 198 199=cut