1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 #pragma once 19 20 #include <deque> 21 #include <tuple> 22 #include <atomic> 23 #include <chrono> 24 #include <string> 25 #include <climits> 26 27 #include "tscore/ink_config.h" 28 #include "ts/ts.h" 29 #include "utilities.h" 30 31 constexpr auto QUEUE_DELAY_TIME = std::chrono::milliseconds{200}; // Examine the queue every 200ms 32 using QueueTime = std::chrono::time_point<std::chrono::system_clock>; 33 34 /////////////////////////////////////////////////////////////////////////////// 35 // Base class for all limiters 36 // 37 template <class T> class RateLimiter 38 { 39 using QueueItem = std::tuple<T, TSCont, QueueTime>; 40 41 public: RateLimiter()42 RateLimiter() : _queue_lock(TSMutexCreate()), _active_lock(TSMutexCreate()) {} 43 ~RateLimiter()44 virtual ~RateLimiter() 45 { 46 TSMutexDestroy(_queue_lock); 47 TSMutexDestroy(_active_lock); 48 } 49 50 // Reserve / release a slot from the active resource limits. Reserve will return 51 // false if we are unable to reserve a slot. 52 bool reserve()53 reserve() 54 { 55 TSReleaseAssert(_active <= limit); 56 TSMutexLock(_active_lock); 57 if (_active < limit) { 58 ++_active; 59 TSMutexUnlock(_active_lock); // Reduce the critical section, release early 60 TSDebug(PLUGIN_NAME, "Reserving a slot, active entities == %u", active()); 61 return true; 62 } else { 63 TSMutexUnlock(_active_lock); 64 return false; 65 } 66 } 67 68 void release()69 release() 70 { 71 TSMutexLock(_active_lock); 72 --_active; 73 TSMutexUnlock(_active_lock); 74 TSDebug(PLUGIN_NAME, "Releasing a slot, active entities == %u", active()); 75 } 76 77 // Current size of the active_in connections 78 unsigned active()79 active() const 80 { 81 return _active.load(); 82 } 83 84 // Current size of the queue 85 unsigned size()86 size() const 87 { 88 return _size.load(); 89 } 90 91 // Is the queue full (at it's max size)? 92 bool full()93 full() const 94 { 95 return (_size == max_queue); 96 } 97 98 void push(T elem,TSCont cont)99 push(T elem, TSCont cont) 100 { 101 QueueTime now = std::chrono::system_clock::now(); 102 103 TSMutexLock(_queue_lock); 104 _queue.push_front(std::make_tuple(elem, cont, now)); 105 ++_size; 106 TSMutexUnlock(_queue_lock); 107 } 108 109 QueueItem pop()110 pop() 111 { 112 QueueItem item; 113 114 TSMutexLock(_queue_lock); 115 if (!_queue.empty()) { 116 item = std::move(_queue.back()); 117 _queue.pop_back(); 118 --_size; 119 } 120 TSMutexUnlock(_queue_lock); 121 122 return item; 123 } 124 125 bool hasOldEntity(QueueTime now)126 hasOldEntity(QueueTime now) const 127 { 128 TSMutexLock(_queue_lock); 129 if (!_queue.empty()) { 130 QueueItem item = _queue.back(); 131 TSMutexUnlock(_queue_lock); // A little ugly but this reduces the critical section for the lock a little bit. 132 133 std::chrono::milliseconds age = std::chrono::duration_cast<std::chrono::milliseconds>(now - std::get<2>(item)); 134 135 return (age >= max_age); 136 } else { 137 TSMutexUnlock(_queue_lock); 138 return false; 139 } 140 } 141 142 // Initialize a new instance of this rate limiter 143 bool initialize(int argc, const char *argv[]); 144 145 // These are the configurable portions of this limiter, public so sue me. 146 unsigned limit = 100; // Arbitrary default, probably should be a required config 147 unsigned max_queue = UINT_MAX; // No queue limit, but if sets will give an immediate error if at max 148 std::chrono::milliseconds max_age = std::chrono::milliseconds::zero(); // Max age (ms) in the queue 149 std::string description = ""; 150 151 private: 152 std::atomic<unsigned> _active = 0; // Current active number of txns. This has to always stay <= limit above 153 std::atomic<unsigned> _size = 0; // Current size of the pending queue of txns. This should aim to be < _max_queue 154 155 TSMutex _queue_lock, _active_lock; // Resource locks 156 std::deque<QueueItem> _queue; // Queue for the pending TXN's. ToDo: Should also move (see below) 157 }; 158