1 /*
2  * Copyright (C) 2001-2009 Jacek Sieka, arnetheduck on gmail point com
3  *
4  * This program is free software; you can redistribute it and/or modify
5  * it under the terms of the GNU General Public License as published by
6  * the Free Software Foundation; either version 2 of the License, or
7  * (at your option) any later version.
8  *
9  * This program is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12  * GNU General Public License for more details.
13  *
14  * You should have received a copy of the GNU General Public License
15  * along with this program; if not, write to the Free Software
16  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17  */
18 
19 #include "stdinc.h"
20 #include "DCPlusPlus.h"
21 
22 #include "UploadManager.h"
23 
24 #include "ConnectionManager.h"
25 #include "LogManager.h"
26 #include "ShareManager.h"
27 #include "ClientManager.h"
28 #include "FilteredFile.h"
29 #include "ZUtils.h"
30 #include "HashManager.h"
31 #include "AdcCommand.h"
32 #include "FavoriteManager.h"
33 #include "CryptoManager.h"
34 #include "Upload.h"
35 #include "UserConnection.h"
36 
37 #include <functional>
38 
39 namespace dcpp {
40 
41 static const string UPLOAD_AREA = "Uploads";
42 
UploadManager()43 UploadManager::UploadManager() throw() : running(0), extra(0), lastGrant(0) {
44 	ClientManager::getInstance()->addListener(this);
45 	TimerManager::getInstance()->addListener(this);
46 }
47 
~UploadManager()48 UploadManager::~UploadManager() throw() {
49 	TimerManager::getInstance()->removeListener(this);
50 	ClientManager::getInstance()->removeListener(this);
51 	while(true) {
52 		{
53 			Lock l(cs);
54 			if(uploads.empty())
55 				break;
56 		}
57 		Thread::sleep(100);
58 	}
59 }
60 
prepareFile(UserConnection & aSource,const string & aType,const string & aFile,int64_t aStartPos,int64_t aBytes,bool listRecursive)61 bool UploadManager::prepareFile(UserConnection& aSource, const string& aType, const string& aFile, int64_t aStartPos, int64_t aBytes, bool listRecursive) {
62 	dcdebug("Preparing %s %s " I64_FMT " " I64_FMT " %d\n", aType.c_str(), aFile.c_str(), aStartPos, aBytes, listRecursive);
63 
64 	if(aFile.empty() || aStartPos < 0 || aBytes < -1 || aBytes == 0) {
65 		aSource.fileNotAvail("Invalid request");
66 		return false;
67 	}
68 
69 	InputStream* is = 0;
70 	int64_t start = 0;
71 	int64_t size = 0;
72 
73 	bool userlist = (aFile == Transfer::USER_LIST_NAME_BZ || aFile == Transfer::USER_LIST_NAME);
74 	bool free = userlist;
75 
76 	string sourceFile;
77 	Transfer::Type type;
78 
79 	try {
80 		if(aType == Transfer::names[Transfer::TYPE_FILE]) {
81 			sourceFile = ShareManager::getInstance()->toReal(aFile);
82 
83 			if(aFile == Transfer::USER_LIST_NAME) {
84 				// Unpack before sending...
85 				string bz2 = File(sourceFile, File::READ, File::OPEN).read();
86 				string xml;
87 				CryptoManager::getInstance()->decodeBZ2(reinterpret_cast<const uint8_t*>(bz2.data()), bz2.size(), xml);
88 				// Clear to save some memory...
89 				string().swap(bz2);
90 				is = new MemoryInputStream(xml);
91 				start = 0;
92 				size = xml.size();
93 			} else {
94 				File* f = new File(sourceFile, File::READ, File::OPEN);
95 
96 				start = aStartPos;
97 				int64_t sz = f->getSize();
98 				size = (aBytes == -1) ? sz - start : aBytes;
99 
100 				if((start + size) > sz) {
101 					aSource.fileNotAvail();
102 					delete f;
103 					return false;
104 				}
105 
106 				free = free || (sz <= (int64_t)(SETTING(SET_MINISLOT_SIZE) * 1024) );
107 
108 				f->setPos(start);
109 				is = f;
110 				if((start + size) < sz) {
111 					is = new LimitedInputStream<true>(is, size);
112 				}
113 			}
114 			type = userlist ? Transfer::TYPE_FULL_LIST : Transfer::TYPE_FILE;
115 		} else if(aType == Transfer::names[Transfer::TYPE_TREE]) {
116 			sourceFile = ShareManager::getInstance()->toReal(aFile);
117 			MemoryInputStream* mis = ShareManager::getInstance()->getTree(aFile);
118 			if(!mis) {
119 				aSource.fileNotAvail();
120 				return false;
121 			}
122 
123 			start = 0;
124 			size = mis->getSize();
125 			is = mis;
126 			free = true;
127 			type = Transfer::TYPE_TREE;
128 		} else if(aType == Transfer::names[Transfer::TYPE_PARTIAL_LIST]) {
129 			// Partial file list
130 			MemoryInputStream* mis = ShareManager::getInstance()->generatePartialList(aFile, listRecursive);
131 			if(mis == NULL) {
132 				aSource.fileNotAvail();
133 				return false;
134 			}
135 
136 			start = 0;
137 			size = mis->getSize();
138 			is = mis;
139 			free = true;
140 			type = Transfer::TYPE_PARTIAL_LIST;
141 		} else {
142 			aSource.fileNotAvail("Unknown file type");
143 			return false;
144 		}
145 	} catch(const ShareException& e) {
146 		aSource.fileNotAvail(e.getError());
147 		return false;
148 	} catch(const Exception& e) {
149 		LogManager::getInstance()->message(str(F_("Unable to send file %1%: %2%") % Util::addBrackets(sourceFile) % e.getError()));
150 		aSource.fileNotAvail();
151 		return false;
152 	}
153 
154 	Lock l(cs);
155 
156 	bool extraSlot = false;
157 
158 	if(!aSource.isSet(UserConnection::FLAG_HASSLOT)) {
159 		bool hasReserved = (reservedSlots.find(aSource.getUser()) != reservedSlots.end());
160 		bool isFavorite = FavoriteManager::getInstance()->hasSlot(aSource.getUser());
161 
162 		if(!(hasReserved || isFavorite || getFreeSlots() > 0 || getAutoSlot())) {
163 			bool supportsFree = aSource.isSet(UserConnection::FLAG_SUPPORTS_MINISLOTS);
164 			bool allowedFree = aSource.isSet(UserConnection::FLAG_HASEXTRASLOT) || aSource.isSet(UserConnection::FLAG_OP) || getFreeExtraSlots() > 0;
165 			if(free && supportsFree && allowedFree) {
166 				extraSlot = true;
167 			} else {
168 				delete is;
169 				aSource.maxedOut();
170 
171 				// Check for tth root identifier
172 				string tFile = aFile;
173 				if (tFile.compare(0, 4, "TTH/") == 0)
174 					tFile = ShareManager::getInstance()->toVirtual(TTHValue(aFile.substr(4)));
175 
176 				addFailedUpload(aSource, tFile +
177 					" (" +  Util::formatBytes(aStartPos) + " - " + Util::formatBytes(aStartPos + aBytes) + ")");
178 				aSource.disconnect();
179 				return false;
180 			}
181 		} else {
182 			clearUserFiles(aSource.getUser());	// this user is using a full slot, nix them.
183 		}
184 
185 		setLastGrant(GET_TICK());
186 	}
187 
188 	Upload* u = new Upload(aSource, sourceFile, TTHValue());
189 	u->setStream(is);
190 	u->setSegment(Segment(start, size));
191 
192 	u->setType(type);
193 
194 	uploads.push_back(u);
195 
196 	if(!aSource.isSet(UserConnection::FLAG_HASSLOT)) {
197 		if(extraSlot) {
198 			if(!aSource.isSet(UserConnection::FLAG_HASEXTRASLOT)) {
199 				aSource.setFlag(UserConnection::FLAG_HASEXTRASLOT);
200 				extra++;
201 			}
202 		} else {
203 			if(aSource.isSet(UserConnection::FLAG_HASEXTRASLOT)) {
204 				aSource.unsetFlag(UserConnection::FLAG_HASEXTRASLOT);
205 				extra--;
206 			}
207 			aSource.setFlag(UserConnection::FLAG_HASSLOT);
208 			running++;
209 		}
210 
211 		reservedSlots.erase(aSource.getUser());
212 	}
213 
214 	return true;
215 }
216 
getRunningAverage()217 int64_t UploadManager::getRunningAverage() {
218 	Lock l(cs);
219 	int64_t avg = 0;
220 	for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
221 		Upload* u = *i;
222 		avg += u->getAverageSpeed();
223 	}
224 	return avg;
225 }
226 
getAutoSlot()227 bool UploadManager::getAutoSlot() {
228 	/** A 0 in settings means disable */
229 	if(SETTING(MIN_UPLOAD_SPEED) == 0)
230 		return false;
231 	/** Only grant one slot per 30 sec */
232 	if(GET_TICK() < getLastGrant() + 30*1000)
233 		return false;
234 	/** Grant if upload speed is less than the threshold speed */
235 	return getRunningAverage() < (SETTING(MIN_UPLOAD_SPEED)*1024);
236 }
237 
removeUpload(Upload * aUpload)238 void UploadManager::removeUpload(Upload* aUpload) {
239 	Lock l(cs);
240 	dcassert(find(uploads.begin(), uploads.end(), aUpload) != uploads.end());
241 	uploads.erase(remove(uploads.begin(), uploads.end(), aUpload), uploads.end());
242 	delete aUpload;
243 }
244 
reserveSlot(const UserPtr & aUser,const string & hubHint)245 void UploadManager::reserveSlot(const UserPtr& aUser, const string& hubHint) {
246 	{
247 		Lock l(cs);
248 		reservedSlots.insert(aUser);
249 	}
250 	if(aUser->isOnline())
251 		ClientManager::getInstance()->connect(aUser, Util::toString(Util::rand()), hubHint);
252 }
253 
on(UserConnectionListener::Get,UserConnection * aSource,const string & aFile,int64_t aResume)254 void UploadManager::on(UserConnectionListener::Get, UserConnection* aSource, const string& aFile, int64_t aResume) throw() {
255 	if(aSource->getState() != UserConnection::STATE_GET) {
256 		dcdebug("UM::onGet Bad state, ignoring\n");
257 		return;
258 	}
259 
260 	if(prepareFile(*aSource, Transfer::names[Transfer::TYPE_FILE], Util::toAdcFile(aFile), aResume, -1)) {
261 		aSource->setState(UserConnection::STATE_SEND);
262 		aSource->fileLength(Util::toString(aSource->getUpload()->getSize()));
263 	}
264 }
265 
on(UserConnectionListener::Send,UserConnection * aSource)266 void UploadManager::on(UserConnectionListener::Send, UserConnection* aSource) throw() {
267 	if(aSource->getState() != UserConnection::STATE_SEND) {
268 		dcdebug("UM::onSend Bad state, ignoring\n");
269 		return;
270 	}
271 
272 	Upload* u = aSource->getUpload();
273 	dcassert(u != NULL);
274 
275 	u->setStart(GET_TICK());
276 	u->tick();
277 	aSource->setState(UserConnection::STATE_RUNNING);
278 	aSource->transmitFile(u->getStream());
279 	fire(UploadManagerListener::Starting(), u);
280 }
281 
on(AdcCommand::GET,UserConnection * aSource,const AdcCommand & c)282 void UploadManager::on(AdcCommand::GET, UserConnection* aSource, const AdcCommand& c) throw() {
283 	if(aSource->getState() != UserConnection::STATE_GET) {
284 		dcdebug("UM::onGET Bad state, ignoring\n");
285 		return;
286 	}
287 
288 	const string& type = c.getParam(0);
289 	const string& fname = c.getParam(1);
290 	int64_t aStartPos = Util::toInt64(c.getParam(2));
291 	int64_t aBytes = Util::toInt64(c.getParam(3));
292 
293 	if(prepareFile(*aSource, type, fname, aStartPos, aBytes, c.hasFlag("RE", 4))) {
294 		Upload* u = aSource->getUpload();
295 		dcassert(u != NULL);
296 
297 		AdcCommand cmd(AdcCommand::CMD_SND);
298 		cmd.addParam(type).addParam(fname)
299 			.addParam(Util::toString(u->getStartPos()))
300 			.addParam(Util::toString(u->getSize()));
301 
302 		if(c.hasFlag("ZL", 4)) {
303 			u->setStream(new FilteredInputStream<ZFilter, true>(u->getStream()));
304 			u->setFlag(Upload::FLAG_ZUPLOAD);
305 			cmd.addParam("ZL1");
306 		}
307 
308 		aSource->send(cmd);
309 
310 		u->setStart(GET_TICK());
311 		u->tick();
312 		aSource->setState(UserConnection::STATE_RUNNING);
313 		aSource->transmitFile(u->getStream());
314 		fire(UploadManagerListener::Starting(), u);
315 	}
316 }
317 
on(UserConnectionListener::BytesSent,UserConnection * aSource,size_t aBytes,size_t aActual)318 void UploadManager::on(UserConnectionListener::BytesSent, UserConnection* aSource, size_t aBytes, size_t aActual) throw() {
319 	dcassert(aSource->getState() == UserConnection::STATE_RUNNING);
320 	Upload* u = aSource->getUpload();
321 	dcassert(u != NULL);
322 	u->addPos(aBytes, aActual);
323 	u->tick();
324 }
325 
on(UserConnectionListener::Failed,UserConnection * aSource,const string & aError)326 void UploadManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& aError) throw() {
327 	Upload* u = aSource->getUpload();
328 
329 	if(u) {
330 		fire(UploadManagerListener::Failed(), u, aError);
331 
332 		dcdebug("UM::onFailed (%s): Removing upload\n", aError.c_str());
333 		removeUpload(u);
334 	}
335 
336 	removeConnection(aSource);
337 }
338 
on(UserConnectionListener::TransmitDone,UserConnection * aSource)339 void UploadManager::on(UserConnectionListener::TransmitDone, UserConnection* aSource) throw() {
340 	dcassert(aSource->getState() == UserConnection::STATE_RUNNING);
341 	Upload* u = aSource->getUpload();
342 	dcassert(u != NULL);
343 
344 	aSource->setState(UserConnection::STATE_GET);
345 
346 	if(BOOLSETTING(LOG_UPLOADS) && u->getType() != Transfer::TYPE_TREE && (BOOLSETTING(LOG_FILELIST_TRANSFERS) || u->getType() != Transfer::TYPE_FULL_LIST)) {
347 		StringMap params;
348 		u->getParams(*aSource, params);
349 		LOG(LogManager::UPLOAD, params);
350 	}
351 
352 	fire(UploadManagerListener::Complete(), u);
353 	removeUpload(u);
354 }
355 
addFailedUpload(const UserConnection & source,string filename)356 void UploadManager::addFailedUpload(const UserConnection& source, string filename) {
357 	{
358 		Lock l(cs);
359 		WaitingUserList::iterator it = find_if(waitingUsers.begin(), waitingUsers.end(), CompareFirst<UserPtr, uint32_t>(source.getUser()));
360 		if (it==waitingUsers.end()) {
361 			waitingUsers.push_back(WaitingUser(source.getUser(), GET_TICK()));
362 		} else {
363 			it->second = GET_TICK();
364 		}
365 		waitingFiles[source.getUser()].insert(filename);		//files for which user's asked
366 	}
367 
368 	fire(UploadManagerListener::WaitingAddFile(), source.getUser(), filename);
369 }
370 
clearUserFiles(const UserPtr & source)371 void UploadManager::clearUserFiles(const UserPtr& source) {
372 	Lock l(cs);
373 	//run this when a user's got a slot or goes offline.
374 	WaitingUserList::iterator sit = find_if(waitingUsers.begin(), waitingUsers.end(), CompareFirst<UserPtr, uint32_t>(source));
375 	if (sit == waitingUsers.end()) return;
376 
377 	FilesMap::iterator fit = waitingFiles.find(sit->first);
378 	if (fit != waitingFiles.end()) waitingFiles.erase(fit);
379 	fire(UploadManagerListener::WaitingRemoveUser(), sit->first);
380 
381 	waitingUsers.erase(sit);
382 }
383 
getWaitingUsers()384 UserList UploadManager::getWaitingUsers() {
385 	Lock l(cs);
386 	UserList u;
387 	for(WaitingUserList::const_iterator i = waitingUsers.begin(); i != waitingUsers.end(); ++i) {
388 		u.push_back(i->first);
389 	}
390 	return u;
391 }
392 
getWaitingUserFiles(const UserPtr & u)393 const UploadManager::FileSet& UploadManager::getWaitingUserFiles(const UserPtr& u) {
394 	Lock l(cs);
395 	return waitingFiles.find(u)->second;
396 }
397 
addConnection(UserConnectionPtr conn)398 void UploadManager::addConnection(UserConnectionPtr conn) {
399 	conn->addListener(this);
400 	conn->setState(UserConnection::STATE_GET);
401 }
402 
removeConnection(UserConnection * aSource)403 void UploadManager::removeConnection(UserConnection* aSource) {
404 	dcassert(aSource->getUpload() == NULL);
405 	aSource->removeListener(this);
406 	if(aSource->isSet(UserConnection::FLAG_HASSLOT)) {
407 		running--;
408 		aSource->unsetFlag(UserConnection::FLAG_HASSLOT);
409 	}
410 	if(aSource->isSet(UserConnection::FLAG_HASEXTRASLOT)) {
411 		extra--;
412 		aSource->unsetFlag(UserConnection::FLAG_HASEXTRASLOT);
413 	}
414 }
415 
on(TimerManagerListener::Minute,uint32_t)416 void UploadManager::on(TimerManagerListener::Minute, uint32_t /* aTick */) throw() {
417 	UserList disconnects;
418 	{
419 		Lock l(cs);
420 
421 		WaitingUserList::iterator i = stable_partition(waitingUsers.begin(), waitingUsers.end(), WaitingUserFresh());
422 		for (WaitingUserList::iterator j = i; j != waitingUsers.end(); ++j) {
423 			FilesMap::iterator fit = waitingFiles.find(j->first);
424 			if (fit != waitingFiles.end()) waitingFiles.erase(fit);
425 			fire(UploadManagerListener::WaitingRemoveUser(), j->first);
426 		}
427 
428 		waitingUsers.erase(i, waitingUsers.end());
429 
430 		if( BOOLSETTING(AUTO_KICK) ) {
431 			for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
432 				Upload* u = *i;
433 				if(u->getUser()->isOnline()) {
434 					u->unsetFlag(Upload::FLAG_PENDING_KICK);
435 					continue;
436 				}
437 
438 				if(u->isSet(Upload::FLAG_PENDING_KICK)) {
439 					disconnects.push_back(u->getUser());
440 					continue;
441 				}
442 
443 				if(BOOLSETTING(AUTO_KICK_NO_FAVS) && FavoriteManager::getInstance()->isFavoriteUser(u->getUser())) {
444 					continue;
445 				}
446 
447 				u->setFlag(Upload::FLAG_PENDING_KICK);
448 			}
449 		}
450 	}
451 
452 	for(UserList::iterator i = disconnects.begin(); i != disconnects.end(); ++i) {
453 		LogManager::getInstance()->message(str(F_("Disconnected user leaving the hub: %1%") % Util::toString(ClientManager::getInstance()->getNicks((*i)->getCID()))));
454 		ConnectionManager::getInstance()->disconnect(*i, false);
455 	}
456 }
457 
on(GetListLength,UserConnection * conn)458 void UploadManager::on(GetListLength, UserConnection* conn) throw() {
459 	conn->error("GetListLength not supported");
460 	conn->disconnect(false);
461 }
462 
on(AdcCommand::GFI,UserConnection * aSource,const AdcCommand & c)463 void UploadManager::on(AdcCommand::GFI, UserConnection* aSource, const AdcCommand& c) throw() {
464 	if(aSource->getState() != UserConnection::STATE_GET) {
465 		dcdebug("UM::onSend Bad state, ignoring\n");
466 		return;
467 	}
468 
469 	if(c.getParameters().size() < 2) {
470 		aSource->send(AdcCommand(AdcCommand::SEV_RECOVERABLE, AdcCommand::ERROR_PROTOCOL_GENERIC, "Missing parameters"));
471 		return;
472 	}
473 
474 	const string& type = c.getParam(0);
475 	const string& ident = c.getParam(1);
476 
477 	if(type == Transfer::names[Transfer::TYPE_FILE]) {
478 		try {
479 			aSource->send(ShareManager::getInstance()->getFileInfo(ident));
480 		} catch(const ShareException&) {
481 			aSource->fileNotAvail();
482 		}
483 	} else {
484 		aSource->fileNotAvail();
485 	}
486 }
487 
488 // TimerManagerListener
on(TimerManagerListener::Second,uint32_t)489 void UploadManager::on(TimerManagerListener::Second, uint32_t) throw() {
490 	Lock l(cs);
491 	UploadList ticks;
492 
493 	for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
494 		if((*i)->getPos() > 0) {
495 			ticks.push_back(*i);
496 			(*i)->tick();
497 		}
498 	}
499 
500 	if(ticks.size() > 0)
501 		fire(UploadManagerListener::Tick(), ticks);
502 }
503 
on(ClientManagerListener::UserDisconnected,const UserPtr & aUser)504 void UploadManager::on(ClientManagerListener::UserDisconnected, const UserPtr& aUser) throw() {
505 	if(!aUser->isOnline()) {
506 		clearUserFiles(aUser);
507 	}
508 }
509 
510 } // namespace dcpp
511