1 /*
2 Copyright 2013-2015 Skytechnology sp. z o.o.
3
4 This file is part of LizardFS.
5
6 LizardFS is free software: you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, version 3.
9
10 LizardFS is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17 */
18
19 #include "common/platform.h"
20 #include "common/io_limiting.h"
21
22 #include <algorithm>
23 #include <thread>
24
25 #include "common/io_limits_config_loader.h"
26 #include "common/massert.h"
27 #include "protocol/MFSCommunication.h"
28
29 using namespace ioLimiting;
30
registerReconfigure(ReconfigurationFunction reconfigure)31 void Limiter::registerReconfigure(ReconfigurationFunction reconfigure) {
32 reconfigure_ = reconfigure;
33 }
34
now()35 SteadyTimePoint RTClock::now() {
36 return SteadyClock::now();
37 }
38
sleepUntil(SteadyTimePoint time)39 void RTClock::sleepUntil(SteadyTimePoint time) {
40 return std::this_thread::sleep_until(time);
41 }
42
attempt(uint64_t size)43 bool Group::attempt(uint64_t size) {
44 if (lastRequestEndTime_ + shared_.delta < clock_.now()) {
45 reserve_ = 0;
46 }
47 if (size <= reserve_) {
48 reserve_ -= size;
49 return true;
50 } else {
51 return false;
52 }
53 }
54
enqueue(uint64_t size)55 Group::PendingRequests::iterator Group::enqueue(uint64_t size) {
56 PendingRequests::iterator it = pendingRequests_.emplace(pendingRequests_.end(), size);
57 return it;
58 }
59
dequeue(PendingRequests::iterator it)60 void Group::dequeue(PendingRequests::iterator it) {
61 pastRequests_.emplace_back(clock_.now(), it->size);
62 pendingRequests_.erase(it);
63 }
64
notifyQueue()65 void Group::notifyQueue() {
66 if (!pendingRequests_.empty()) {
67 pendingRequests_.front().cond.notify_one();
68 }
69 }
70
isFirst(PendingRequests::iterator it) const71 bool Group::isFirst(PendingRequests::iterator it) const {
72 return it == pendingRequests_.begin();
73 }
74
askMaster(std::unique_lock<std::mutex> & lock)75 void Group::askMaster(std::unique_lock<std::mutex>& lock) {
76 while (!pastRequests_.empty()
77 && ((pastRequests_.front().creationTime + shared_.delta) < clock_.now())) {
78 pastRequests_.pop_front();
79 }
80 uint64_t size = 0;
81 for (const auto& request : pendingRequests_) {
82 size += request.size;
83 }
84 for (const auto& request : pastRequests_) {
85 size += request.size;
86 }
87 sassert(size > reserve_);
88 size -= reserve_;
89 lastRequestStartTime_ = clock_.now();
90 lock.unlock();
91 uint64_t receivedSize = shared_.limiter.request(groupId_, size);
92 lock.lock();
93 lastRequestEndTime_ = clock_.now();
94 lastRequestSuccessful_ = receivedSize >= size;
95 reserve_ += receivedSize;
96 }
97
wait(uint64_t size,SteadyTimePoint deadline,std::unique_lock<std::mutex> & lock)98 uint8_t Group::wait(uint64_t size, SteadyTimePoint deadline, std::unique_lock<std::mutex>& lock) {
99 PendingRequests::iterator it = enqueue(size);
100 it->cond.wait(lock, [this, it]() {return isFirst(it);});
101 uint8_t status = LIZARDFS_ERROR_TIMEOUT;
102 while (clock_.now() < deadline) {
103 if (dead_) {
104 status = LIZARDFS_ERROR_ENOENT;
105 break;
106 }
107 if (attempt(size)) {
108 status = LIZARDFS_STATUS_OK;
109 break;
110 }
111 if (!lastRequestSuccessful_) {
112 SteadyTimePoint nextRequestTime = lastRequestStartTime_ + shared_.delta;
113 if (nextRequestTime > deadline) {
114 break;
115 }
116 lock.unlock();
117 clock_.sleepUntil(nextRequestTime);
118 lock.lock();
119 if (dead_) {
120 continue;
121 }
122 }
123 askMaster(lock);
124 }
125 dequeue(it);
126 notifyQueue();
127 return status;
128 }
129
die()130 void Group::die() {
131 dead_ = true;
132 }
133