1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 #pragma once
19 
20 #include <deque>
21 #include <tuple>
22 #include <atomic>
23 #include <chrono>
24 #include <string>
25 #include <climits>
26 
27 #include "tscore/ink_config.h"
28 #include "ts/ts.h"
29 #include "utilities.h"
30 
31 constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{200}; // Examine the queue every 200ms
32 using QueueTime                 = std::chrono::time_point<std::chrono::system_clock>;
33 
34 ///////////////////////////////////////////////////////////////////////////////
35 // Base class for all limiters
36 //
37 template <class T> class RateLimiter
38 {
39   using QueueItem = std::tuple<T, TSCont, QueueTime>;
40 
41 public:
RateLimiter()42   RateLimiter() : _queue_lock(TSMutexCreate()), _active_lock(TSMutexCreate()) {}
43 
~RateLimiter()44   virtual ~RateLimiter()
45   {
46     TSMutexDestroy(_queue_lock);
47     TSMutexDestroy(_active_lock);
48   }
49 
50   // Reserve / release a slot from the active resource limits. Reserve will return
51   // false if we are unable to reserve a slot.
52   bool
reserve()53   reserve()
54   {
55     TSReleaseAssert(_active <= limit);
56     TSMutexLock(_active_lock);
57     if (_active < limit) {
58       ++_active;
59       TSMutexUnlock(_active_lock); // Reduce the critical section, release early
60       TSDebug(PLUGIN_NAME, "Reserving a slot, active entities == %u", active());
61       return true;
62     } else {
63       TSMutexUnlock(_active_lock);
64       return false;
65     }
66   }
67 
68   void
release()69   release()
70   {
71     TSMutexLock(_active_lock);
72     --_active;
73     TSMutexUnlock(_active_lock);
74     TSDebug(PLUGIN_NAME, "Releasing a slot, active entities == %u", active());
75   }
76 
77   // Current size of the active_in connections
78   unsigned
active()79   active() const
80   {
81     return _active.load();
82   }
83 
84   // Current size of the queue
85   unsigned
size()86   size() const
87   {
88     return _size.load();
89   }
90 
91   // Is the queue full (at it's max size)?
92   bool
full()93   full() const
94   {
95     return (_size == max_queue);
96   }
97 
98   void
push(T elem,TSCont cont)99   push(T elem, TSCont cont)
100   {
101     QueueTime now = std::chrono::system_clock::now();
102 
103     TSMutexLock(_queue_lock);
104     _queue.push_front(std::make_tuple(elem, cont, now));
105     ++_size;
106     TSMutexUnlock(_queue_lock);
107   }
108 
109   QueueItem
pop()110   pop()
111   {
112     QueueItem item;
113 
114     TSMutexLock(_queue_lock);
115     if (!_queue.empty()) {
116       item = std::move(_queue.back());
117       _queue.pop_back();
118       --_size;
119     }
120     TSMutexUnlock(_queue_lock);
121 
122     return item;
123   }
124 
125   bool
hasOldEntity(QueueTime now)126   hasOldEntity(QueueTime now) const
127   {
128     TSMutexLock(_queue_lock);
129     if (!_queue.empty()) {
130       QueueItem item = _queue.back();
131       TSMutexUnlock(_queue_lock); // A little ugly but this reduces the critical section for the lock a little bit.
132 
133       std::chrono::milliseconds age = std::chrono::duration_cast<std::chrono::milliseconds>(now - std::get<2>(item));
134 
135       return (age >= max_age);
136     } else {
137       TSMutexUnlock(_queue_lock);
138       return false;
139     }
140   }
141 
142   // Initialize a new instance of this rate limiter
143   bool initialize(int argc, const char *argv[]);
144 
145   // These are the configurable portions of this limiter, public so sue me.
146   unsigned limit                    = 100;      // Arbitrary default, probably should be a required config
147   unsigned max_queue                = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max
148   std::chrono::milliseconds max_age = std::chrono::milliseconds::zero(); // Max age (ms) in the queue
149   std::string description           = "";
150 
151 private:
152   std::atomic<unsigned> _active = 0; // Current active number of txns. This has to always stay <= limit above
153   std::atomic<unsigned> _size   = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue
154 
155   TSMutex _queue_lock, _active_lock; // Resource locks
156   std::deque<QueueItem> _queue;      // Queue for the pending TXN's. ToDo: Should also move (see below)
157 };
158