1 /*
2 * Copyright (C) 2001-2009 Jacek Sieka, arnetheduck on gmail point com
3 *
4 * This program is free software; you can redistribute it and/or modify
5 * it under the terms of the GNU General Public License as published by
6 * the Free Software Foundation; either version 2 of the License, or
7 * (at your option) any later version.
8 *
9 * This program is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 * GNU General Public License for more details.
13 *
14 * You should have received a copy of the GNU General Public License
15 * along with this program; if not, write to the Free Software
16 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
17 */
18
19 #include "stdinc.h"
20 #include "DCPlusPlus.h"
21
22 #include "UploadManager.h"
23
24 #include "ConnectionManager.h"
25 #include "LogManager.h"
26 #include "ShareManager.h"
27 #include "ClientManager.h"
28 #include "FilteredFile.h"
29 #include "ZUtils.h"
30 #include "HashManager.h"
31 #include "AdcCommand.h"
32 #include "FavoriteManager.h"
33 #include "CryptoManager.h"
34 #include "Upload.h"
35 #include "UserConnection.h"
36
37 #include <functional>
38
39 namespace dcpp {
40
41 static const string UPLOAD_AREA = "Uploads";
42
UploadManager()43 UploadManager::UploadManager() throw() : running(0), extra(0), lastGrant(0) {
44 ClientManager::getInstance()->addListener(this);
45 TimerManager::getInstance()->addListener(this);
46 }
47
~UploadManager()48 UploadManager::~UploadManager() throw() {
49 TimerManager::getInstance()->removeListener(this);
50 ClientManager::getInstance()->removeListener(this);
51 while(true) {
52 {
53 Lock l(cs);
54 if(uploads.empty())
55 break;
56 }
57 Thread::sleep(100);
58 }
59 }
60
prepareFile(UserConnection & aSource,const string & aType,const string & aFile,int64_t aStartPos,int64_t aBytes,bool listRecursive)61 bool UploadManager::prepareFile(UserConnection& aSource, const string& aType, const string& aFile, int64_t aStartPos, int64_t aBytes, bool listRecursive) {
62 dcdebug("Preparing %s %s " I64_FMT " " I64_FMT " %d\n", aType.c_str(), aFile.c_str(), aStartPos, aBytes, listRecursive);
63
64 if(aFile.empty() || aStartPos < 0 || aBytes < -1 || aBytes == 0) {
65 aSource.fileNotAvail("Invalid request");
66 return false;
67 }
68
69 InputStream* is = 0;
70 int64_t start = 0;
71 int64_t size = 0;
72
73 bool userlist = (aFile == Transfer::USER_LIST_NAME_BZ || aFile == Transfer::USER_LIST_NAME);
74 bool free = userlist;
75
76 string sourceFile;
77 Transfer::Type type;
78
79 try {
80 if(aType == Transfer::names[Transfer::TYPE_FILE]) {
81 sourceFile = ShareManager::getInstance()->toReal(aFile);
82
83 if(aFile == Transfer::USER_LIST_NAME) {
84 // Unpack before sending...
85 string bz2 = File(sourceFile, File::READ, File::OPEN).read();
86 string xml;
87 CryptoManager::getInstance()->decodeBZ2(reinterpret_cast<const uint8_t*>(bz2.data()), bz2.size(), xml);
88 // Clear to save some memory...
89 string().swap(bz2);
90 is = new MemoryInputStream(xml);
91 start = 0;
92 size = xml.size();
93 } else {
94 File* f = new File(sourceFile, File::READ, File::OPEN);
95
96 start = aStartPos;
97 int64_t sz = f->getSize();
98 size = (aBytes == -1) ? sz - start : aBytes;
99
100 if((start + size) > sz) {
101 aSource.fileNotAvail();
102 delete f;
103 return false;
104 }
105
106 free = free || (sz <= (int64_t)(SETTING(SET_MINISLOT_SIZE) * 1024) );
107
108 f->setPos(start);
109 is = f;
110 if((start + size) < sz) {
111 is = new LimitedInputStream<true>(is, size);
112 }
113 }
114 type = userlist ? Transfer::TYPE_FULL_LIST : Transfer::TYPE_FILE;
115 } else if(aType == Transfer::names[Transfer::TYPE_TREE]) {
116 sourceFile = ShareManager::getInstance()->toReal(aFile);
117 MemoryInputStream* mis = ShareManager::getInstance()->getTree(aFile);
118 if(!mis) {
119 aSource.fileNotAvail();
120 return false;
121 }
122
123 start = 0;
124 size = mis->getSize();
125 is = mis;
126 free = true;
127 type = Transfer::TYPE_TREE;
128 } else if(aType == Transfer::names[Transfer::TYPE_PARTIAL_LIST]) {
129 // Partial file list
130 MemoryInputStream* mis = ShareManager::getInstance()->generatePartialList(aFile, listRecursive);
131 if(mis == NULL) {
132 aSource.fileNotAvail();
133 return false;
134 }
135
136 start = 0;
137 size = mis->getSize();
138 is = mis;
139 free = true;
140 type = Transfer::TYPE_PARTIAL_LIST;
141 } else {
142 aSource.fileNotAvail("Unknown file type");
143 return false;
144 }
145 } catch(const ShareException& e) {
146 aSource.fileNotAvail(e.getError());
147 return false;
148 } catch(const Exception& e) {
149 LogManager::getInstance()->message(str(F_("Unable to send file %1%: %2%") % Util::addBrackets(sourceFile) % e.getError()));
150 aSource.fileNotAvail();
151 return false;
152 }
153
154 Lock l(cs);
155
156 bool extraSlot = false;
157
158 if(!aSource.isSet(UserConnection::FLAG_HASSLOT)) {
159 bool hasReserved = (reservedSlots.find(aSource.getUser()) != reservedSlots.end());
160 bool isFavorite = FavoriteManager::getInstance()->hasSlot(aSource.getUser());
161
162 if(!(hasReserved || isFavorite || getFreeSlots() > 0 || getAutoSlot())) {
163 bool supportsFree = aSource.isSet(UserConnection::FLAG_SUPPORTS_MINISLOTS);
164 bool allowedFree = aSource.isSet(UserConnection::FLAG_HASEXTRASLOT) || aSource.isSet(UserConnection::FLAG_OP) || getFreeExtraSlots() > 0;
165 if(free && supportsFree && allowedFree) {
166 extraSlot = true;
167 } else {
168 delete is;
169 aSource.maxedOut();
170
171 // Check for tth root identifier
172 string tFile = aFile;
173 if (tFile.compare(0, 4, "TTH/") == 0)
174 tFile = ShareManager::getInstance()->toVirtual(TTHValue(aFile.substr(4)));
175
176 addFailedUpload(aSource, tFile +
177 " (" + Util::formatBytes(aStartPos) + " - " + Util::formatBytes(aStartPos + aBytes) + ")");
178 aSource.disconnect();
179 return false;
180 }
181 } else {
182 clearUserFiles(aSource.getUser()); // this user is using a full slot, nix them.
183 }
184
185 setLastGrant(GET_TICK());
186 }
187
188 Upload* u = new Upload(aSource, sourceFile, TTHValue());
189 u->setStream(is);
190 u->setSegment(Segment(start, size));
191
192 u->setType(type);
193
194 uploads.push_back(u);
195
196 if(!aSource.isSet(UserConnection::FLAG_HASSLOT)) {
197 if(extraSlot) {
198 if(!aSource.isSet(UserConnection::FLAG_HASEXTRASLOT)) {
199 aSource.setFlag(UserConnection::FLAG_HASEXTRASLOT);
200 extra++;
201 }
202 } else {
203 if(aSource.isSet(UserConnection::FLAG_HASEXTRASLOT)) {
204 aSource.unsetFlag(UserConnection::FLAG_HASEXTRASLOT);
205 extra--;
206 }
207 aSource.setFlag(UserConnection::FLAG_HASSLOT);
208 running++;
209 }
210
211 reservedSlots.erase(aSource.getUser());
212 }
213
214 return true;
215 }
216
getRunningAverage()217 int64_t UploadManager::getRunningAverage() {
218 Lock l(cs);
219 int64_t avg = 0;
220 for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
221 Upload* u = *i;
222 avg += u->getAverageSpeed();
223 }
224 return avg;
225 }
226
getAutoSlot()227 bool UploadManager::getAutoSlot() {
228 /** A 0 in settings means disable */
229 if(SETTING(MIN_UPLOAD_SPEED) == 0)
230 return false;
231 /** Only grant one slot per 30 sec */
232 if(GET_TICK() < getLastGrant() + 30*1000)
233 return false;
234 /** Grant if upload speed is less than the threshold speed */
235 return getRunningAverage() < (SETTING(MIN_UPLOAD_SPEED)*1024);
236 }
237
removeUpload(Upload * aUpload)238 void UploadManager::removeUpload(Upload* aUpload) {
239 Lock l(cs);
240 dcassert(find(uploads.begin(), uploads.end(), aUpload) != uploads.end());
241 uploads.erase(remove(uploads.begin(), uploads.end(), aUpload), uploads.end());
242 delete aUpload;
243 }
244
reserveSlot(const UserPtr & aUser,const string & hubHint)245 void UploadManager::reserveSlot(const UserPtr& aUser, const string& hubHint) {
246 {
247 Lock l(cs);
248 reservedSlots.insert(aUser);
249 }
250 if(aUser->isOnline())
251 ClientManager::getInstance()->connect(aUser, Util::toString(Util::rand()), hubHint);
252 }
253
on(UserConnectionListener::Get,UserConnection * aSource,const string & aFile,int64_t aResume)254 void UploadManager::on(UserConnectionListener::Get, UserConnection* aSource, const string& aFile, int64_t aResume) throw() {
255 if(aSource->getState() != UserConnection::STATE_GET) {
256 dcdebug("UM::onGet Bad state, ignoring\n");
257 return;
258 }
259
260 if(prepareFile(*aSource, Transfer::names[Transfer::TYPE_FILE], Util::toAdcFile(aFile), aResume, -1)) {
261 aSource->setState(UserConnection::STATE_SEND);
262 aSource->fileLength(Util::toString(aSource->getUpload()->getSize()));
263 }
264 }
265
on(UserConnectionListener::Send,UserConnection * aSource)266 void UploadManager::on(UserConnectionListener::Send, UserConnection* aSource) throw() {
267 if(aSource->getState() != UserConnection::STATE_SEND) {
268 dcdebug("UM::onSend Bad state, ignoring\n");
269 return;
270 }
271
272 Upload* u = aSource->getUpload();
273 dcassert(u != NULL);
274
275 u->setStart(GET_TICK());
276 u->tick();
277 aSource->setState(UserConnection::STATE_RUNNING);
278 aSource->transmitFile(u->getStream());
279 fire(UploadManagerListener::Starting(), u);
280 }
281
on(AdcCommand::GET,UserConnection * aSource,const AdcCommand & c)282 void UploadManager::on(AdcCommand::GET, UserConnection* aSource, const AdcCommand& c) throw() {
283 if(aSource->getState() != UserConnection::STATE_GET) {
284 dcdebug("UM::onGET Bad state, ignoring\n");
285 return;
286 }
287
288 const string& type = c.getParam(0);
289 const string& fname = c.getParam(1);
290 int64_t aStartPos = Util::toInt64(c.getParam(2));
291 int64_t aBytes = Util::toInt64(c.getParam(3));
292
293 if(prepareFile(*aSource, type, fname, aStartPos, aBytes, c.hasFlag("RE", 4))) {
294 Upload* u = aSource->getUpload();
295 dcassert(u != NULL);
296
297 AdcCommand cmd(AdcCommand::CMD_SND);
298 cmd.addParam(type).addParam(fname)
299 .addParam(Util::toString(u->getStartPos()))
300 .addParam(Util::toString(u->getSize()));
301
302 if(c.hasFlag("ZL", 4)) {
303 u->setStream(new FilteredInputStream<ZFilter, true>(u->getStream()));
304 u->setFlag(Upload::FLAG_ZUPLOAD);
305 cmd.addParam("ZL1");
306 }
307
308 aSource->send(cmd);
309
310 u->setStart(GET_TICK());
311 u->tick();
312 aSource->setState(UserConnection::STATE_RUNNING);
313 aSource->transmitFile(u->getStream());
314 fire(UploadManagerListener::Starting(), u);
315 }
316 }
317
on(UserConnectionListener::BytesSent,UserConnection * aSource,size_t aBytes,size_t aActual)318 void UploadManager::on(UserConnectionListener::BytesSent, UserConnection* aSource, size_t aBytes, size_t aActual) throw() {
319 dcassert(aSource->getState() == UserConnection::STATE_RUNNING);
320 Upload* u = aSource->getUpload();
321 dcassert(u != NULL);
322 u->addPos(aBytes, aActual);
323 u->tick();
324 }
325
on(UserConnectionListener::Failed,UserConnection * aSource,const string & aError)326 void UploadManager::on(UserConnectionListener::Failed, UserConnection* aSource, const string& aError) throw() {
327 Upload* u = aSource->getUpload();
328
329 if(u) {
330 fire(UploadManagerListener::Failed(), u, aError);
331
332 dcdebug("UM::onFailed (%s): Removing upload\n", aError.c_str());
333 removeUpload(u);
334 }
335
336 removeConnection(aSource);
337 }
338
on(UserConnectionListener::TransmitDone,UserConnection * aSource)339 void UploadManager::on(UserConnectionListener::TransmitDone, UserConnection* aSource) throw() {
340 dcassert(aSource->getState() == UserConnection::STATE_RUNNING);
341 Upload* u = aSource->getUpload();
342 dcassert(u != NULL);
343
344 aSource->setState(UserConnection::STATE_GET);
345
346 if(BOOLSETTING(LOG_UPLOADS) && u->getType() != Transfer::TYPE_TREE && (BOOLSETTING(LOG_FILELIST_TRANSFERS) || u->getType() != Transfer::TYPE_FULL_LIST)) {
347 StringMap params;
348 u->getParams(*aSource, params);
349 LOG(LogManager::UPLOAD, params);
350 }
351
352 fire(UploadManagerListener::Complete(), u);
353 removeUpload(u);
354 }
355
addFailedUpload(const UserConnection & source,string filename)356 void UploadManager::addFailedUpload(const UserConnection& source, string filename) {
357 {
358 Lock l(cs);
359 WaitingUserList::iterator it = find_if(waitingUsers.begin(), waitingUsers.end(), CompareFirst<UserPtr, uint32_t>(source.getUser()));
360 if (it==waitingUsers.end()) {
361 waitingUsers.push_back(WaitingUser(source.getUser(), GET_TICK()));
362 } else {
363 it->second = GET_TICK();
364 }
365 waitingFiles[source.getUser()].insert(filename); //files for which user's asked
366 }
367
368 fire(UploadManagerListener::WaitingAddFile(), source.getUser(), filename);
369 }
370
clearUserFiles(const UserPtr & source)371 void UploadManager::clearUserFiles(const UserPtr& source) {
372 Lock l(cs);
373 //run this when a user's got a slot or goes offline.
374 WaitingUserList::iterator sit = find_if(waitingUsers.begin(), waitingUsers.end(), CompareFirst<UserPtr, uint32_t>(source));
375 if (sit == waitingUsers.end()) return;
376
377 FilesMap::iterator fit = waitingFiles.find(sit->first);
378 if (fit != waitingFiles.end()) waitingFiles.erase(fit);
379 fire(UploadManagerListener::WaitingRemoveUser(), sit->first);
380
381 waitingUsers.erase(sit);
382 }
383
getWaitingUsers()384 UserList UploadManager::getWaitingUsers() {
385 Lock l(cs);
386 UserList u;
387 for(WaitingUserList::const_iterator i = waitingUsers.begin(); i != waitingUsers.end(); ++i) {
388 u.push_back(i->first);
389 }
390 return u;
391 }
392
getWaitingUserFiles(const UserPtr & u)393 const UploadManager::FileSet& UploadManager::getWaitingUserFiles(const UserPtr& u) {
394 Lock l(cs);
395 return waitingFiles.find(u)->second;
396 }
397
addConnection(UserConnectionPtr conn)398 void UploadManager::addConnection(UserConnectionPtr conn) {
399 conn->addListener(this);
400 conn->setState(UserConnection::STATE_GET);
401 }
402
removeConnection(UserConnection * aSource)403 void UploadManager::removeConnection(UserConnection* aSource) {
404 dcassert(aSource->getUpload() == NULL);
405 aSource->removeListener(this);
406 if(aSource->isSet(UserConnection::FLAG_HASSLOT)) {
407 running--;
408 aSource->unsetFlag(UserConnection::FLAG_HASSLOT);
409 }
410 if(aSource->isSet(UserConnection::FLAG_HASEXTRASLOT)) {
411 extra--;
412 aSource->unsetFlag(UserConnection::FLAG_HASEXTRASLOT);
413 }
414 }
415
on(TimerManagerListener::Minute,uint32_t)416 void UploadManager::on(TimerManagerListener::Minute, uint32_t /* aTick */) throw() {
417 UserList disconnects;
418 {
419 Lock l(cs);
420
421 WaitingUserList::iterator i = stable_partition(waitingUsers.begin(), waitingUsers.end(), WaitingUserFresh());
422 for (WaitingUserList::iterator j = i; j != waitingUsers.end(); ++j) {
423 FilesMap::iterator fit = waitingFiles.find(j->first);
424 if (fit != waitingFiles.end()) waitingFiles.erase(fit);
425 fire(UploadManagerListener::WaitingRemoveUser(), j->first);
426 }
427
428 waitingUsers.erase(i, waitingUsers.end());
429
430 if( BOOLSETTING(AUTO_KICK) ) {
431 for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
432 Upload* u = *i;
433 if(u->getUser()->isOnline()) {
434 u->unsetFlag(Upload::FLAG_PENDING_KICK);
435 continue;
436 }
437
438 if(u->isSet(Upload::FLAG_PENDING_KICK)) {
439 disconnects.push_back(u->getUser());
440 continue;
441 }
442
443 if(BOOLSETTING(AUTO_KICK_NO_FAVS) && FavoriteManager::getInstance()->isFavoriteUser(u->getUser())) {
444 continue;
445 }
446
447 u->setFlag(Upload::FLAG_PENDING_KICK);
448 }
449 }
450 }
451
452 for(UserList::iterator i = disconnects.begin(); i != disconnects.end(); ++i) {
453 LogManager::getInstance()->message(str(F_("Disconnected user leaving the hub: %1%") % Util::toString(ClientManager::getInstance()->getNicks((*i)->getCID()))));
454 ConnectionManager::getInstance()->disconnect(*i, false);
455 }
456 }
457
on(GetListLength,UserConnection * conn)458 void UploadManager::on(GetListLength, UserConnection* conn) throw() {
459 conn->error("GetListLength not supported");
460 conn->disconnect(false);
461 }
462
on(AdcCommand::GFI,UserConnection * aSource,const AdcCommand & c)463 void UploadManager::on(AdcCommand::GFI, UserConnection* aSource, const AdcCommand& c) throw() {
464 if(aSource->getState() != UserConnection::STATE_GET) {
465 dcdebug("UM::onSend Bad state, ignoring\n");
466 return;
467 }
468
469 if(c.getParameters().size() < 2) {
470 aSource->send(AdcCommand(AdcCommand::SEV_RECOVERABLE, AdcCommand::ERROR_PROTOCOL_GENERIC, "Missing parameters"));
471 return;
472 }
473
474 const string& type = c.getParam(0);
475 const string& ident = c.getParam(1);
476
477 if(type == Transfer::names[Transfer::TYPE_FILE]) {
478 try {
479 aSource->send(ShareManager::getInstance()->getFileInfo(ident));
480 } catch(const ShareException&) {
481 aSource->fileNotAvail();
482 }
483 } else {
484 aSource->fileNotAvail();
485 }
486 }
487
488 // TimerManagerListener
on(TimerManagerListener::Second,uint32_t)489 void UploadManager::on(TimerManagerListener::Second, uint32_t) throw() {
490 Lock l(cs);
491 UploadList ticks;
492
493 for(UploadList::iterator i = uploads.begin(); i != uploads.end(); ++i) {
494 if((*i)->getPos() > 0) {
495 ticks.push_back(*i);
496 (*i)->tick();
497 }
498 }
499
500 if(ticks.size() > 0)
501 fire(UploadManagerListener::Tick(), ticks);
502 }
503
on(ClientManagerListener::UserDisconnected,const UserPtr & aUser)504 void UploadManager::on(ClientManagerListener::UserDisconnected, const UserPtr& aUser) throw() {
505 if(!aUser->isOnline()) {
506 clearUserFiles(aUser);
507 }
508 }
509
510 } // namespace dcpp
511