1 /*
2    Copyright 2013-2014 EditShare, 2013-2015 Skytechnology sp. z o.o.
3 
4    This file is part of LizardFS.
5 
6    LizardFS is free software: you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation, version 3.
9 
10    LizardFS is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13    GNU General Public License for more details.
14 
15    You should have received a copy of the GNU General Public License
16    along with LizardFS. If not, see <http://www.gnu.org/licenses/>.
17  */
18 
19 #include "common/platform.h"
20 #include "mount/global_io_limiter.h"
21 
22 #include <unistd.h>
23 #include <algorithm>
24 
25 #include "protocol/cltoma.h"
26 #include "protocol/matocl.h"
27 #include "common/token_bucket.h"
28 #include "common/slogger.h"
29 #include "mount/io_limit_group.h"
30 
31 using namespace ioLimiting;
32 
MasterLimiter()33 MasterLimiter::MasterLimiter() : iolimitsConfigHandler_(*this), configVersion_(0) {
34 	auto res = fs_register_packet_type_handler(LIZ_MATOCL_IOLIMITS_CONFIG, &iolimitsConfigHandler_);
35 	(void)res;
36 	assert(res);
37 }
38 
~MasterLimiter()39 MasterLimiter::~MasterLimiter() {
40 	auto res = fs_unregister_packet_type_handler(LIZ_MATOCL_IOLIMITS_CONFIG, &iolimitsConfigHandler_);
41 	(void)res;
42 	assert(res);
43 }
44 
request(const IoLimitGroupId & groupId,uint64_t size)45 uint64_t MasterLimiter::request(const IoLimitGroupId& groupId, uint64_t size) {
46 	MessageBuffer buffer;
47 	cltoma::iolimit::serialize(buffer, 0, configVersion_, groupId, size);
48 	uint8_t status = fs_raw_sendandreceive(buffer, LIZ_MATOCL_IOLIMIT);
49 	if (status != LIZARDFS_STATUS_OK) {
50 		lzfs_pretty_syslog(LOG_NOTICE, "Sending IOLIMIT returned status %s", lizardfs_error_string(status));
51 		return 0;
52 	}
53 	uint32_t receivedMsgid, receivedConfigVersion;
54 	std::string receivedGroupId;
55 	uint64_t receivedSize;
56 	matocl::iolimit::deserialize(buffer, receivedMsgid, receivedConfigVersion, receivedGroupId,
57 			receivedSize);
58 	if (receivedConfigVersion != configVersion_) {
59 		lzfs_pretty_syslog(LOG_NOTICE,
60 				"Received unexpected IOLIMIT config version %" PRIu32 " instead of %" PRIu32,
61 				receivedConfigVersion, configVersion_);
62 		return 0;
63 	}
64 	if (receivedGroupId != groupId) {
65 		lzfs_pretty_syslog(LOG_NOTICE, "Received IOLIMIT group %s instead of %s",
66 				receivedGroupId.c_str(), groupId.c_str());
67 		return 0;
68 	}
69 	return receivedSize;
70 }
71 
handle(MessageBuffer buffer)72 bool MasterLimiter::IolimitsConfigHandler::handle(MessageBuffer buffer) {
73 	try {
74 		uint32_t configVersion;
75 		uint32_t delta_us;
76 		std::string subsystem;
77 		std::vector<std::string> groups;
78 		matocl::iolimitsConfig::deserialize(buffer.data(), buffer.size(),
79 				configVersion, delta_us, subsystem, groups);
80 		parent_.configVersion_ = configVersion;
81 		parent_.reconfigure_(delta_us, subsystem, groups);
82 		lzfs_pretty_syslog(LOG_INFO, "Received IO limits configuration update from master");
83 		return true;
84 	} catch (IncorrectDeserializationException& ex) {
85 		lzfs_pretty_syslog(LOG_ERR, "Malformed MATOCL_IOLIMITS_CONFIG: %s", ex.what());
86 		return false;
87 	}
88 }
89 
request(const IoLimitGroupId & groupId,uint64_t size)90 uint64_t MountLimiter::request(const IoLimitGroupId& groupId, uint64_t size) {
91 	return database_.request(SteadyClock::now(), groupId, size);
92 }
93 
loadConfiguration(const IoLimitsConfigLoader & config)94 void MountLimiter::loadConfiguration(const IoLimitsConfigLoader& config) {
95 	database_.setLimits(SteadyClock::now(), config.limits(), 200);
96 	reconfigure_(1000, config.subsystem(), database_.getGroups());
97 }
98 
getGroup(const IoLimitGroupId & groupId) const99 std::shared_ptr<Group> LimiterProxy::getGroup(const IoLimitGroupId& groupId) const {
100 	Groups::const_iterator groupIt = groups_.find(groupId);
101 	if (groupIt == groups_.end()) {
102 		groupIt = groups_.find(kUnclassified);
103 	}
104 	if (groupIt == groups_.end()) {
105 		return nullptr;
106 	}
107 	return groupIt->second;
108 }
109 
waitForRead(const pid_t pid,const uint64_t size,SteadyTimePoint deadline)110 uint8_t LimiterProxy::waitForRead(const pid_t pid, const uint64_t size, SteadyTimePoint deadline) {
111 	std::unique_lock<std::mutex> lock(mutex_);
112 	uint8_t status;
113 	do {
114 		if (!enabled_) {
115 			return LIZARDFS_STATUS_OK;
116 		}
117 		IoLimitGroupId groupId = getIoLimitGroupIdNoExcept(pid, subsystem_);
118 		// Grab a shared_ptr reference on the group descriptor so that reconfigure() can
119 		// quickly unreference this group from the groups_ map without waiting for us.
120 		std::shared_ptr<Group> group = getGroup(groupId);
121 		if (!group) {
122 			return LIZARDFS_ERROR_EPERM;
123 		}
124 		status = group->wait(size, deadline, lock);
125 	} while (status == LIZARDFS_ERROR_ENOENT); // loop if the group disappeared due to reconfiguration
126 	return status;
127 }
128 
waitForWrite(const pid_t pid,const uint64_t size,SteadyTimePoint deadline)129 uint8_t LimiterProxy::waitForWrite(const pid_t pid, const uint64_t size, SteadyTimePoint deadline) {
130 	return waitForRead(pid, size, deadline);
131 }
132 
reconfigure(uint32_t delta_us,const std::string & subsystem,const std::vector<IoLimitGroupId> & groupIds)133 void LimiterProxy::reconfigure(uint32_t delta_us, const std::string& subsystem,
134 		const std::vector<IoLimitGroupId>& groupIds) {
135 
136 	std::vector<std::reference_wrapper<const IoLimitGroupId>>
137 		newGroupIds(groupIds.begin(), groupIds.end());
138 	std::sort(newGroupIds.begin(), newGroupIds.end(), std::less<std::string>());
139 
140 	std::unique_lock<std::mutex> lock(mutex_);
141 
142 	const bool differentSubsystem = (subsystem_ != subsystem);
143 	auto newIter = newGroupIds.begin();
144 	auto oldIter = groups_.begin();
145 
146 	while (true) {
147 		while (oldIter != groups_.end() &&
148 				(newIter == newGroupIds.end() ||
149 				 newIter->get() > oldIter->first)) {
150 			// no group with such name in new configuration
151 			// notify waitees that there's nothing to wait for
152 			oldIter->second->die();
153 			// make the group unreachable through groups_
154 			oldIter = groups_.erase(oldIter);
155 			// the last waitee's shared_ptr will free the memory
156 		}
157 		if (newIter == newGroupIds.end()) {
158 			// stale groups removed, no more new groups, we are done
159 			break;
160 		}
161 		if (oldIter == groups_.end() ||
162 				oldIter->first > newIter->get()) {
163 			// new group has been added
164 			const std::string& groupId = newIter->get();
165 			oldIter = groups_.insert(oldIter, std::make_pair(groupId,
166 					std::make_shared<Group>(shared_, groupId, clock_)));
167 		} else {
168 			// existing group with the same name
169 			if (differentSubsystem) {
170 				// now it isn't the same group anymore
171 				// notify waitees
172 				oldIter->second->die();
173 				// unreference the old group and create a new one
174 				const std::string& groupId = newIter->get();
175 				oldIter->second = std::make_shared<Group>(shared_, groupId, clock_);
176 			}
177 		}
178 		newIter++;
179 		oldIter++;
180 	}
181 	shared_.delta = std::chrono::microseconds(delta_us);
182 	subsystem_ = subsystem;
183 	enabled_ = (subsystem_ != "" || groups_.count(kUnclassified) == 1);
184 }
185