1 /*
2    Copyright 2013-2015 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "common/io_limiting.h"
21 
22 #include <algorithm>
23 #include <thread>
24 
25 #include "common/io_limits_config_loader.h"
26 #include "common/massert.h"
27 #include "protocol/MFSCommunication.h"
28 
29 using namespace ioLimiting;
30 
registerReconfigure(ReconfigurationFunction reconfigure)31 void Limiter::registerReconfigure(ReconfigurationFunction reconfigure) {
32 	reconfigure_ = reconfigure;
33 }
34 
now()35 SteadyTimePoint RTClock::now() {
36 	return SteadyClock::now();
37 }
38 
sleepUntil(SteadyTimePoint time)39 void RTClock::sleepUntil(SteadyTimePoint time) {
40 	return std::this_thread::sleep_until(time);
41 }
42 
attempt(uint64_t size)43 bool Group::attempt(uint64_t size) {
44 	if (lastRequestEndTime_ + shared_.delta < clock_.now()) {
45 		reserve_ = 0;
46 	}
47 	if (size <= reserve_) {
48 		reserve_ -= size;
49 		return true;
50 	} else {
51 		return false;
52 	}
53 }
54 
enqueue(uint64_t size)55 Group::PendingRequests::iterator Group::enqueue(uint64_t size) {
56 	PendingRequests::iterator it = pendingRequests_.emplace(pendingRequests_.end(), size);
57 	return it;
58 }
59 
dequeue(PendingRequests::iterator it)60 void Group::dequeue(PendingRequests::iterator it) {
61 	pastRequests_.emplace_back(clock_.now(), it->size);
62 	pendingRequests_.erase(it);
63 }
64 
notifyQueue()65 void Group::notifyQueue() {
66 	if (!pendingRequests_.empty()) {
67 		pendingRequests_.front().cond.notify_one();
68 	}
69 }
70 
isFirst(PendingRequests::iterator it) const71 bool Group::isFirst(PendingRequests::iterator it) const {
72 	return it == pendingRequests_.begin();
73 }
74 
askMaster(std::unique_lock<std::mutex> & lock)75 void Group::askMaster(std::unique_lock<std::mutex>& lock) {
76 	while (!pastRequests_.empty()
77 			&& ((pastRequests_.front().creationTime + shared_.delta) < clock_.now())) {
78 		pastRequests_.pop_front();
79 	}
80 	uint64_t size = 0;
81 	for (const auto& request : pendingRequests_) {
82 		size += request.size;
83 	}
84 	for (const auto& request : pastRequests_) {
85 		size += request.size;
86 	}
87 	sassert(size > reserve_);
88 	size -= reserve_;
89 	lastRequestStartTime_ = clock_.now();
90 	lock.unlock();
91 	uint64_t receivedSize = shared_.limiter.request(groupId_, size);
92 	lock.lock();
93 	lastRequestEndTime_ = clock_.now();
94 	lastRequestSuccessful_ = receivedSize >= size;
95 	reserve_ += receivedSize;
96 }
97 
wait(uint64_t size,SteadyTimePoint deadline,std::unique_lock<std::mutex> & lock)98 uint8_t Group::wait(uint64_t size, SteadyTimePoint deadline, std::unique_lock<std::mutex>& lock) {
99 	PendingRequests::iterator it = enqueue(size);
100 	it->cond.wait(lock, [this, it]() {return isFirst(it);});
101 	uint8_t status = LIZARDFS_ERROR_TIMEOUT;
102 	while (clock_.now() < deadline) {
103 		if (dead_) {
104 			status = LIZARDFS_ERROR_ENOENT;
105 			break;
106 		}
107 		if (attempt(size)) {
108 			status = LIZARDFS_STATUS_OK;
109 			break;
110 		}
111 		if (!lastRequestSuccessful_) {
112 			SteadyTimePoint nextRequestTime = lastRequestStartTime_ + shared_.delta;
113 			if (nextRequestTime > deadline) {
114 				break;
115 			}
116 			lock.unlock();
117 			clock_.sleepUntil(nextRequestTime);
118 			lock.lock();
119 			if (dead_) {
120 				continue;
121 			}
122 		}
123 		askMaster(lock);
124 	}
125 	dequeue(it);
126 	notifyQueue();
127 	return status;
128 }
129 
die()130 void Group::die() {
131 	dead_ = true;
132 }
133