1# $Id: /mirror/perl/Data-Throttler-Memcached/trunk/lib/Data/Throttler/BucketChain/Memcached.pm 8774 2007-11-08T09:43:20.728908Z daisuke  $
2#
3# Copyright (c) 2007 Daisuke Maki <daisuke@endeworks.jp>
4# All rights reserved.
5
6package Data::Throttler::BucketChain::Memcached;
7use strict;
8use warnings;
9use base qw(Class::Accessor::Fast Data::Throttler::BucketChain);
10use Cache::Memcached::Managed;
11use Log::Log4perl qw(:easy);
12
13__PACKAGE__->mk_accessors($_) for qw(id max_items interval cache);
14__PACKAGE__->mk_accessors($_) for qw(buckets bucket_time_span nof_buckets );
15
16sub new
17{
18    my $class = shift;
19    my %args  = @_;
20
21    my $self = bless {
22        max_items   => delete $args{max_items},
23        interval    => delete $args{interval},
24        nof_buckets => delete $args{nof_buckets},
25        id          => delete $args{id} || do {
26            no warnings;
27            require Digest::MD5;
28            Digest::MD5::md5_hex($$, time(), rand(), {})
29        }
30    }, $class;
31
32    my $cache = Cache::Memcached::Managed->new(
33        # defaults
34        data      => '127.0.0.1:11211',
35        namespace => $class,
36        # user-specified
37        %{ $args{cache} || {} },
38        # overrides
39        expiration => $self->interval * 2
40    );
41    $self->cache( $cache );
42
43    if(!$self->max_items or !$self->interval) {
44        LOGDIE "Both max_items and interval need to be defined";
45    }
46
47    if(!$self->nof_buckets) {
48        $self->nof_buckets(10);
49    }
50
51    if($self->nof_buckets > $self->interval) {
52        $self->nof_buckets( $self->interval );
53    }
54
55    $self->reset();
56    return $self;
57}
58
59sub reset
60{
61    my $self = shift;
62
63    $self->cache->delete_group( group => $self->id );
64    $self->buckets([]);
65
66    my $bucket_time_span = int ($self->interval / $self->nof_buckets);
67    $self->bucket_time_span( $bucket_time_span );
68
69    my $time_start = time() - ($self->nof_buckets - 1) * $bucket_time_span;
70
71    for(1..$self->nof_buckets) {
72        my $time_end = $time_start + $bucket_time_span - 1;
73        DEBUG "Creating bucket ", _hms($time_start), " - ", _hms($time_end);
74        push @{$self->{buckets}}, {
75            time  => Data::Throttler::Range->new($time_start, $time_end),
76            id    => join('.', $self->id, $time_start, $time_end),
77            count => {},
78        };
79        $time_start = $time_end + 1;
80    }
81
82    $self->{head_bucket_idx} = 0;
83    $self->{tail_bucket_idx} = $#{$self->{buckets}};
84}
85
86sub as_string
87{
88    my($self) = @_;
89
90    warn "as_string for Data::Throttler::Memcached is currently unimplemented";
91}
92
93sub _hms {
94    my($time) = @_;
95
96    my ($sec,$min,$hour) = localtime($time);
97    return sprintf "%02d:%02d:%02d",
98           $hour, $min, $sec;
99}
100
101sub bucket_add
102{
103    my($self, $time) = @_;
104
105      # ... and append a new one at the end
106    my $time_start = $self->{buckets}->
107                      [$self->{tail_bucket_idx}]->{time}->max + 1;
108    my $time_end   = $time_start + $self->{bucket_time_span} - 1;
109
110    DEBUG "Adding bucket: ", _hms($time_start), " - ", _hms($time_end);
111
112    $self->{tail_bucket_idx}++;
113    $self->{tail_bucket_idx} = 0 if $self->{tail_bucket_idx} >
114                                    $#{$self->{buckets}};
115    $self->{head_bucket_idx}++;
116    $self->{head_bucket_idx} = 0 if $self->{head_bucket_idx} >
117                                    $#{$self->{buckets}};
118
119    $self->{buckets}->[ $self->{tail_bucket_idx} ] = {
120          time  => Data::Throttler::Range->new($time_start, $time_end),
121          id    => join('.', $self->id, $time_start, $time_end),
122          count => {},
123    };
124}
125
126sub try_push
127{
128    my($self, %options) = @_;
129
130    my $key = "_default";
131    $key = $options{key} if defined $options{key};
132
133    my $time = time();
134    $time = $options{time} if defined $options{time};
135
136    my $count = 1;
137    $count = $options{count} if defined $options{count};
138
139    DEBUG "Trying to push $key ", _hms($time), " $count";
140
141    my $b = $self->bucket_find($time);
142
143    if(!$b) {
144       $self->rotate($time);
145       $b = $self->bucket_find($time);
146    }
147
148    # Determine the total count for this key
149    my %count = %{ $self->cache->get_multi(
150        id  => [ map { [ $key, $_->{id} ] } @{ $self->buckets } ],
151        key => 'count'
152    ) };
153    my $val = 0;
154    $val += $_ for values %count;
155
156
157    if($val >= $self->{max_items}) {
158        DEBUG "Not increasing counter $key by $count (already at max $val|$self->{max_items})";
159        return 0;
160    } else {
161        DEBUG "Increasing counter $key by $count ",
162              "($val|$self->{max_items})";
163        $self->cache->incr(
164            value => 1,
165            id    => [ $key, $b->{id} ],
166            key   => 'count'
167        );
168        return 1;
169    }
170
171    LOGDIE "Time $time is outside of bucket range\n", $self->as_string;
172    return undef;
173}
174
1751;
176
177__END__
178
179=head1 NAME
180
181Data::Throttler::BucketChain::Memcached - Backend Store for Data::Throttler::Memcached
182
183=head1 SYNOPSIS
184
185  # Internal use only
186
187=head1 METHODS
188
189=head2 new
190
191=head2 try_push
192
193=head2 as_string
194
195=head2 bucket_add
196
197=head2 reset
198
199=cut