1 /***************************************************************************
2 * mgfilemanager.cpp
3 *
4 * Wed Sep 6 22:19:52 2006
5 * Copyright 2006 liubin,China
6 * Email multiget@gmail.com
7 ****************************************************************************/
8
9 /*
10 * This program is free software; you can redistribute it and/or modify
11 * it under the terms of the GNU General Public License as published by
12 * the Free Software Foundation; either version 2 of the License, or
13 * (at your option) any later version.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
19 *
20 * You should have received a copy of the GNU General Public License
21 * along with this program; if not, write to the Free Software
22 * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
23 */
24
25 /*
26
27 filename : mgfilemanager.cpp
28 create : 2006/09/09
29 author : liubin
30 EMAIL : multiget@gmail.com
31 version : 0.5
32 license : GPL
33
34
35 |***|***|***|***|***|***|***|***|***|***|***|***|***|***|***|***|***|***|
36 | | | | | | | | | | | | | | | | | | |
37
38 ***change log***
39 ***修订历史***
40 2006/07/06 :
41 修正GetNextTask()一些细节
42 2006/07/09 :
43 增加了写索引时防止中断
44 2006/07/10 :
45 修订了文件下载完后索引不一致的问题
46 2006/07/31 :
47 修改文件后缀,部分下载末尾加.mg,索引加.mgidx
48 系统整体修改,支持大文件处理
49 2006/08/07 :
50 修改GetNextTask,避免分配文件末尾给线程下载。
51 2006/08/11 :
52 修改GetNextTask,在分配新任务前取消原任务标记。
53 2006/08/12
54 读取索引时过滤长度为0的索引节点,否则可能索引太长了。
55 2006/08/13
56 增加HeartBeat,速度和完成等参数在本地执行
57 2006/09/07
58 为兼容VC6做一些修改,取消构造中运行
59 2006/09/09
60 为兼容VC6做一些修改
61 2006/10/02
62 为不能取得文件长度的任务做数据提交的完善
63 2006/10/06
64 multi url support
65 2006/10/22
66 some translation
67 2006/11/14
68 modify for mac and bsd compile
69 */
70
71 #include "mgfilemanager.h"
72 #include "showtablewindow.h"
73 #include "mgsingletask.h"
74 #include "mainframe.h"
75 #include "mgapp.h"
76 #include "speedctrl.h"
77 #include "mirroradmin.h"
78 #include "mgapp.h"
79
80 #ifdef WIN32
81 const static int _RECVBUF=16384; //seems lost the define in common.h in win32
82 #endif
83
84 #define _MGSTR(s) wxGetApp().GetStr(s)
85
86 extern CSpeedCtrl gSpeedCtrl;
87 //extern CMirrorAdmin gMirrorAdmin;
88
89 using namespace std;
90
CMgFileManager(CMgSingleTask * parent,std::string mainurl,std::string sname,std::string spath,llong nlen,std::string mainrefer)91 CMgFileManager::CMgFileManager(
92 CMgSingleTask *parent,
93 std::string mainurl,
94 std::string sname,
95 std::string spath,
96 llong nlen, //-1 for unknow length
97 std::string mainrefer //only for http
98 )
99 {
100 #ifdef WIN32
101 m_sPathFileName = spath + std::string( "\\" ) + sname;
102 #else
103 m_sPathFileName = spath + std::string( "/" ) + sname;
104 #endif
105 m_sFileName = sname;
106 m_nFileLen = nlen;
107 m_bStop = false;
108 m_pParent = parent;
109 m_bUpdateUI = parent->UpdateUI();
110 m_pShowWin = parent->GetShowTableWin();
111 m_nOriginBytes = 0;
112 m_nSumBytes = 0;
113 m_nSpeed = 0;
114 m_nTimeToFinish = -1;
115 m_fRatio = 0;
116
117 //add the main url to list
118 _ul tt;
119
120 tt.prior = 10000;
121 tt.url = mainurl;
122 tt.refer = mainrefer;
123 tt.bused = false;
124 m_UrlList.push_back( tt );
125
126 memset( m_AntSpeed, 0, 10 * sizeof( int ) );
127 memset( m_AntId2UrlIndex, 0, 10 * sizeof( int ) );
128 m_SumPack = 0;
129 m_nSwap = -1;
130 pthread_mutex_init ( &m_IndexMutex, NULL );
131 pthread_mutex_init ( &m_UListMutex, NULL );
132
133 }
134
135 //if true, init ok
CheckFile()136 bool CMgFileManager::CheckFile()
137 {
138
139 std::string tempmsg;
140
141 OutMsg( _MGSTR( _S_FILEMGR_TASKOBJ ) + m_sPathFileName , MSG_INFO );
142
143 //get user home directory
144 OutMsg( _MGSTR( _S_FILEMGR_GETTEMPDIR ), MSG_INFO );
145
146 std::string tempdir;
147
148 if ( !GetTempDir( tempdir ) )
149 {
150 OutMsg( "get tempdir fail", MSG_ERROR );
151 return false;
152 }
153
154 OutMsg( _MGSTR( _S_FILEMGR_TEMPDIR ) + tempdir );
155 #ifdef WIN32
156 tempdir += std::string( "\\" );
157 #else
158 tempdir += std::string( "/" );
159 #endif
160 m_IndexName = tempdir + m_sFileName + std::string( ".mgidx" );
161 m_PartName = tempdir + m_sFileName + std::string( ".mg" );
162
163 tempmsg = _MGSTR( _S_FILEMGR_TEMPNAME ) + m_PartName;
164 OutMsg( tempmsg );
165 tempmsg = _MGSTR( _S_FILEMGR_INDEXNAME ) + m_IndexName;
166 OutMsg( tempmsg );
167
168 //check if *.mgidx exists
169
170 bool bCanResume = ( m_nFileLen == -1 ? false : true );
171
172 OutMsg( _MGSTR( _S_FILEMGR_CHECKTEMP ), MSG_INFO );
173 FILE *fp;
174
175 if ( bCanResume && NULL != ( fp = fopen( m_PartName.c_str(), "r" ) ) )
176 {
177 fclose( fp );
178 OutMsg( _MGSTR( _S_FILEMGR_TEMPEXIST ), MSG_INFO );
179 }
180 else
181 {
182 bCanResume = false;
183 OutMsg( _MGSTR( _S_FILEMGR_TEMPNOTEXIST ), MSG_WARNNING );
184 }
185
186 if ( bCanResume )
187 {
188 OutMsg( _MGSTR( _S_FILEMGR_CHECKINDEX ), MSG_INFO );
189
190 if ( bCanResume && NULL != ( fp = fopen( m_IndexName.c_str(), "r" ) ) )
191 {
192 fclose( fp );
193 OutMsg( _MGSTR( _S_FILEMGR_INDEXEXIST ), MSG_INFO );
194 }
195 else
196 {
197 bCanResume = false;
198 OutMsg( _MGSTR( _S_FILEMGR_INDEXNOTEXIST ), MSG_WARNNING );
199 }
200 }
201
202 if ( bCanResume )
203 {
204 OutMsg( _MGSTR( _S_FILEMGR_CHECKTEMPLENGTH ) );
205
206 if ( bCanResume && m_nFileLen == FileSize( m_PartName ) )
207 {
208 OutMsg( _MGSTR( _S_FILEMGR_LENGTHOK ), MSG_SUCCESS );
209 }
210 else
211 {
212 bCanResume = false;
213 OutMsg( _MGSTR( _S_FILEMGR_LENGTHFAIL ), MSG_WARNNING );
214 }
215 }
216
217 //if can't resume,create new file
218 if ( bCanResume )
219 {
220 ReadIndex();
221 //m_bOldExist = true;
222 m_nOriginBytes = SumBytes();
223 }
224 else
225 {
226 OutMsg( _MGSTR( _S_FILEMGR_CREATEFILE ), MSG_INFO );
227 //m_bOldExist = false;
228 FILE *fp;
229
230 if ( NULL != ( fp = fopen( m_PartName.c_str(), "wb" ) ) )
231 {
232 if ( WriteIndex() < 0 )
233 {
234 OutMsg( "create index file fail.", MSG_ERROR );
235 //m_bError = true;
236 return false;
237 }
238
239 if ( m_nFileLen != -1 )
240 {
241 //写入文件长度的0到文件,可以对Linux做优化
242 #if defined(WIN32) || defined(__WINDOWS__)
243 char buf[ 1024 ];
244 memset( buf, 0, 1024 );
245 llong count = m_nFileLen;
246
247 while ( count >= 1024 )
248 {
249 fwrite( buf, 1, 1024, fp );
250 count -= 1024;
251 };
252
253 if ( count > 0 )
254 fwrite( buf, 1, count, fp );
255
256 #else
257 //对Linux的优化分配,2006/07/05
258 #if defined(__BSD__)|| defined(__DARWIN__) //it's wx preprocessor
259 //bsd and mac haven't ftruncate64
260 if ( 0 != ftruncate( fileno( fp ), m_nFileLen ) )
261 #else
262 if ( 0 != ftruncate64( fileno( fp ), m_nFileLen ) )
263 #endif
264 {
265 OutMsg( "ftruncate fail,quit.\n", MSG_ERROR );
266 return false;
267 }
268
269 #endif
270
271 } //if(m_nFileLen!=-1)
272
273 OutMsg( _MGSTR( _S_FILEMGR_CREATEFILE ), MSG_SUCCESS ); //msg should change
274
275 fclose( fp );
276
277 }
278 else
279 {
280 tempmsg = std::string( "create " ) + m_PartName + string( " fail,quit." );
281 OutMsg( tempmsg, MSG_ERROR );
282 return false;
283 }
284 }
285
286 if ( m_bUpdateUI && m_pShowWin != NULL )
287 {
288
289 struct bindex data[ 20 ];
290 pthread_mutex_lock( &m_IndexMutex );
291 int nsize = m_Index.size();
292
293 for ( int i = 0;i < nsize && i < 20;i++ )
294 {
295 data[ i ] = m_Index[ i ];
296 }
297
298 pthread_mutex_unlock( &m_IndexMutex );
299
300 m_pShowWin->SetRange( m_nFileLen, 0, nsize, data );
301 }
302
303 return true;
304
305 }
306
307
~CMgFileManager()308 CMgFileManager::~CMgFileManager()
309 {}
310
311 //read the idx file
312 //maybe more than 10 indexs
ReadIndex()313 int CMgFileManager::ReadIndex()
314 {
315 m_Index.clear();
316
317 ifstream idxfile( m_IndexName.c_str() );
318
319 if ( idxfile.bad() )
320 {
321 OutMsg("read index fail1",MSG_ERROR);
322 return 0;
323 }
324
325 char buf[ 256 ];
326
327 while ( idxfile.good() )
328 {
329 llong from, to;
330 idxfile.getline( buf, 256 );
331
332 #ifndef WIN32
333 if ( 2 != sscanf( buf, "(%lld,%lld)", &from, &to ) )
334 #else
335 if ( 2 != sscanf( buf, "(%I64d,%I64d)", &from, &to ) )
336 #endif
337 {
338 break;
339 }
340 else
341 {
342 if ( to <= from )
343 continue; //invalid data
344
345 if ( to < 0 || from < 0 )
346 continue; //invalid data
347
348 bindex node;
349
350 node.start = from;
351
352 node.end = to;
353
354 node.mark = 0;
355
356 m_Index.push_back( node );
357 }
358 }
359
360 idxfile.close();
361 //check index
362
363 if ( m_nFileLen == -1 )
364 {
365 m_Index.clear();
366 }
367 else
368 {
369 vector<bindex>::iterator it;
370
371 for ( it = m_Index.begin();it != m_Index.end();it++ )
372 {
373 if ( it->start < 0 || it->start > it->end || it->start > m_nFileLen )
374 {
375 m_Index.clear();
376 break;
377 }
378
379 if ( it->end < 0 || it->end < it->start || it->end > m_nFileLen )
380 {
381 m_Index.clear();
382 break;
383 }
384 }
385 }
386
387 if(m_Index.size()>=1) OutMsg("get index ok");
388 return m_Index.size();
389 }
390
391 //ok tid is 1 based
ThreadQuit(int tid)392 void CMgFileManager::ThreadQuit( int tid )
393 {
394
395 pthread_mutex_lock( &m_IndexMutex );
396
397 vector<bindex>::iterator it;
398
399 for ( it = m_Index.begin();it != m_Index.end();it++ )
400 {
401 if ( it->mark == tid )
402 {
403 it->mark = 0;
404
405 break;
406 }
407 }
408
409 MergeIndex(); //clear some nodes
410
411 pthread_mutex_unlock( &m_IndexMutex );
412 }
413
414 //write index to file
WriteIndex()415 int CMgFileManager::WriteIndex()
416 {
417
418 /*/
419 FILE *fp;
420 fp=fopen( m_IndexName.c_str(),"r");
421 if(fp==NULL) return -1;
422
423 char buf[128];
424 vector<bindex>::iterator it;
425 for ( it = m_Index.begin();it != m_Index.end();it++ )
426 {
427 sprintf(buf,"(%lld,%lld)\n",it->start,it->end);
428 fwrite(buf,strlen(buf),1,fp);
429
430 }
431
432 fclose(fp);
433 return m_Index.size();
434
435 /*/
436 ofstream idxfile( m_IndexName.c_str(), ios::out | ios::trunc );
437
438 if ( idxfile.bad() )
439 return -1;
440
441 vector<bindex>::iterator it;
442
443 for ( it = m_Index.begin();it != m_Index.end();it++ )
444 {
445 idxfile << "(" << it->start << "," << it->end << ")" << endl;
446 }
447
448 idxfile.close();
449 return m_Index.size();
450
451 //*/
452
453 }
454
455 //get file length,check if the length ok
FileSize(string sFileName)456 llong CMgFileManager::FileSize( string sFileName )
457 {
458
459 ifstream f;
460 f.open( sFileName.c_str(), ios::binary | ios::in );
461
462 if ( !f.good() || f.eof() || !f.is_open() )
463 {
464 return 0;
465 }
466
467 f.seekg( 0, ios::beg );
468 ifstream::pos_type begin_pos = f.tellg();
469 f.seekg( 0, std::ios::end );
470 llong len = static_cast<llong>( f.tellg() - begin_pos );
471 f.close();
472 return len;
473
474 }
475
476
477 struct SAscendingDateSort
478 {
operator ()SAscendingDateSort479 bool operator() ( bindex a, bindex b )
480 {
481 return a.start < b.start;
482 }
483 };
484
485 //order the index
OrderIndex()486 void CMgFileManager::OrderIndex()
487 { //按起点顺序从小到大排序
488
489 if ( m_Index.size() <= 1 )
490 return ;
491
492 sort( m_Index.begin(), m_Index.end(), SAscendingDateSort() );
493 }
494
495 //lastsize returned for sometime thread can't get filesize ahead till task finish
IsTaskFinish(llong & lastsize)496 bool CMgFileManager::IsTaskFinish( llong& lastsize )
497 {
498
499 if ( m_Index.size() == 1 &&
500 m_Index[ 0 ].start == 0 &&
501 m_Index[ 0 ].end == m_nFileLen
502 )
503 {
504
505 lastsize = m_nFileLen;
506 return true;
507 }
508 else
509 {
510 if ( m_nFileLen == 0 )
511 {
512 lastsize=0; return true;
513 } else {
514 lastsize = m_nFileLen;
515 return false;
516 }
517 }
518
519 }
520
521 //对索引合并,应该加锁使用
522 //对没有作标记的0长度索引,删除
MergeIndex()523 void CMgFileManager::MergeIndex()
524 {
525 if ( m_Index.empty() )
526 return ;
527
528 if ( m_Index.size() == 1 )
529 {
530 if ( !m_Index.front().mark && m_Index.front().start == m_Index.front().end )
531 {
532 m_Index.clear();
533 return ;
534 }
535 }
536
537 vector<bindex> icopy;
538
539 vector<bindex>::iterator it;
540
541 bindex lastindex = m_Index.front();
542
543 for ( it = m_Index.begin() + 1;it != m_Index.end();it++ )
544 {
545 if ( lastindex.end >= it->start )
546 {
547 if ( lastindex.end < it->end )
548 {
549 lastindex.end = it->end;
550 lastindex.mark = it->mark;
551 }
552 else
553 {
554 continue;
555 }
556 }
557 else
558 {
559 if ( !lastindex.mark && lastindex.start == lastindex.end )
560 {
561 }
562 else
563 {
564 icopy.push_back( lastindex );
565 }
566
567 lastindex = *it;
568 }
569 }
570
571 icopy.push_back( lastindex );
572
573 m_Index.swap( icopy );
574
575 }
576
577 //thread give his data by calling this function
578 //if >0 returned thread should continue to get data
579 //if =0 returned thread should stop.
580 //if <0 returned ,means something error with write data,thread should stop.
581 //tid is 1 based
FileData(int tid,llong offset,int len,void * data,bool end,int & us)582 int CMgFileManager::FileData( int tid, llong offset, int len, void *data, bool end, int& us )
583 {
584
585 //if posible ,check file length
586
587 if ( end )
588 {
589
590 llong flen = offset + len;
591
592 if ( m_nFileLen != -1 )
593 { //have size
594
595 if ( m_nFileLen != flen )
596 { //socket close maybe not indicate the file end.
597 //sometime overload server will close the connection
598 //so we don't use this info if we alread have file size
599 OutMsg( _MGSTR( _S_FILEMGR_ERRORCLOSESOCK ), MSG_WARNNING );
600 }
601 else
602 {
603 OutMsg( _MGSTR( _S_FILEMGR_FILELENCONFIRM ), MSG_SUCCESS );
604 }
605 }
606 else
607 { //no filesize,asume the flen is filesize,because server closed the connection
608 //no function convert long long to string
609 char lenstr[ 24 ];
610 sprintf( lenstr, "%lld", flen );
611 m_nFileLen = flen;
612 OutMsg( _MGSTR( _S_FILEMGR_ENDFILELEN ) + string( lenstr ) );
613 }
614 }
615
616 pthread_mutex_lock( &m_IndexMutex );
617
618 us = gSpeedCtrl.DataIn( len ); //speed control
619
620 bool combine;
621 //write data to file
622 int wret = WriteData( offset, len, data, combine );
623
624 if ( wret >= 0 )
625 {
626 WriteIndex(); //write index
627
628 if ( combine )
629 {
630 m_nSumBytes = SumBytes() - m_nOriginBytes;
631 }
632 else
633 {
634 m_nSumBytes += len; //record data to calcu speed
635 }
636 }
637 else
638 {
639 goto quit;
640 //pthread_mutex_unlock(&m_IndexMutex);
641 //return wret;
642 }
643
644 //OK,update ui data
645 if ( m_bUpdateUI && m_pShowWin )
646 {
647
648 struct bindex data[ 20 ];
649
650 for ( int i = 0;i < int( m_Index.size() ) && i < 20;i++ )
651 {
652 data[ i ] = m_Index[ i ];
653 }
654
655 m_pShowWin->SetData( m_Index.size(), data );
656
657 }
658
659 quit:
660
661 m_UrlList[ m_AntId2UrlIndex[ tid - 1 ] ].prior++; //multi url prior
662 m_AntSpeed[ tid - 1 ] ++;
663 m_SumPack++;
664
665
666 if ( m_UrlList.size() > 1 && m_nSwap > 0 )
667 {
668 if ( m_nSwap == tid )
669 {
670
671 if ( wret > 0 )
672 wret = -3;
673
674 memset( m_AntSpeed, 0, 10 * sizeof( int ) );
675
676 m_SumPack = 0;
677
678 m_nSwap = -1;
679
680 }
681
682 pthread_mutex_unlock( &m_IndexMutex );
683 return wret;
684
685 }
686
687 //10M check
688 if ( m_SumPack * RECVBUF > 10 * 1024 * 1024 && m_UrlList.size() > 1 )
689 {
690
691 //检查我们这个线程的速度和最大速度的差异,如果>=2就试图切换
692 //本线程速度=m_AntSpeed[tid-1];
693 //找最大速度
694 int maxspeed = -1;
695
696 for ( int i = 0;i < 10;i++ )
697 {
698 if ( m_AntSpeed[ i ] > maxspeed )
699 maxspeed = m_AntSpeed[ i ];
700 }
701
702 int minspeed = 1000000;
703 int minpos = -1;
704 //找最小速度
705
706 for ( int i = 0;i < 10;i++ )
707 {
708 if ( m_AntSpeed[ i ] > 0 && m_AntSpeed[ i ] < minspeed )
709 {
710 minspeed = m_AntSpeed[ i ];
711 minpos = i;
712 }
713 }
714
715 //check if have new mirror.
716 bool havenew = false;
717
718 pthread_mutex_lock( &m_UListMutex );
719
720 std::vector<_ul>::const_iterator itm;
721
722 for ( itm = m_UrlList.begin();itm != m_UrlList.end();itm++ )
723 {
724 if ( !itm->bused )
725 {
726 havenew = true;
727 break;
728 }
729 }
730
731 pthread_mutex_unlock( &m_UListMutex );
732
733 if ( havenew )
734 {
735 m_nSwap = minpos + 1;
736
737 if ( m_nSwap == tid )
738 {
739 if ( wret > 0 )
740 wret = -3;
741
742 memset( m_AntSpeed, 0, 10 * sizeof( int ) );
743
744 m_SumPack = 0;
745
746 m_nSwap = -1;
747 }
748 }
749
750 else if ( float( maxspeed ) / minspeed >= 2.0 )
751 {
752
753 m_nSwap = minpos + 1;
754
755 if ( m_nSwap == tid )
756 {
757
758 if ( wret > 0 )
759 wret = -3;
760
761 memset( m_AntSpeed, 0, 10 * sizeof( int ) );
762
763 m_SumPack = 0;
764
765 m_nSwap = -1;
766 }
767
768 }
769 else
770 { //wait for next
771 memset( m_AntSpeed, 0, 10 * sizeof( int ) );
772 m_SumPack = 0;
773 m_nSwap = -1;
774 }
775 }
776
777 pthread_mutex_unlock( &m_IndexMutex );
778 //if socks closed the end this session
779 //2006/10/09 comment out 2 lines
780 //if(end && wret<0) return wret;
781 //else if(end && wret>=0) return 0;
782 return wret;
783 }
784
785 //write data to file
786 //return:
787 // <0 error
788 // =0 merged with other block,stop
789 // >0 continue
790
WriteData(llong offset,int len,void * data,bool & combine)791 int CMgFileManager::WriteData( llong offset, int len, void *data, bool& combine )
792 {
793
794 FILE * fp;
795
796 combine = false;
797
798 //use wxwidgets preprocesser ,mac and bsd only have fopen
799 #if defined(WIN32) || defined(__BSD__) || defined(__WINDOWS__) || defined(__DARWIN__)
800
801 if ( NULL != ( fp = fopen( m_PartName.c_str(), "rb+" ) ) )
802 #else
803
804 if ( NULL != ( fp = fopen64( m_PartName.c_str(), "r+" ) ) )
805 #endif
806
807 {
808
809 #ifdef WIN32
810
811 if ( 0 != fseek( fp, offset, SEEK_SET ) )
812 #else
813
814 if ( 0 != fseeko( fp, offset, SEEK_SET ) )
815 #endif
816
817 {
818 OutMsg( "fseeko fail.", MSG_ERROR );
819 return -1;
820 }
821
822 //似乎要在这里防止数据过载,否则文件末尾会变化
823
824 if ( m_nFileLen != -1 )
825 { //know filelen
826
827 if ( offset + ( llong ) len <= m_nFileLen )
828 {
829 if ( len != int( fwrite( data, 1, len, fp ) ) )
830 {
831 OutMsg( "fwrite fail.", MSG_ERROR );
832 return -1;
833 }
834 }
835 else if ( m_nFileLen - offset > 0 )
836 {
837 OutMsg( "data overload", MSG_ERROR );
838
839 if ( m_nFileLen - offset != int( fwrite( data, 1, m_nFileLen - offset, fp ) ) )
840 {
841 OutMsg( "fwrite fail.", MSG_ERROR );
842 return -1;
843 }
844 }
845 else
846 {
847 OutMsg( "data overload", MSG_ERROR );
848 }
849 }
850 else //unknow filelen
851 {
852 if ( len != int( fwrite( data, 1, len, fp ) ) )
853 {
854 OutMsg( "fwrite fail.", MSG_ERROR );
855 return -1;
856 }
857 }
858
859 fclose( fp );
860 //是否继续下载要判断是否这段数据已经连接到了下一个已下载的数据段
861 //首先找到这个数据段在索引中的项目,然后查找下一个索引的起点位置,如果能够重合,就停止本线程下载
862 //bool found=false;
863
864 for ( int i = 0;i < int( m_Index.size() );i++ )
865 {
866 if ( m_Index[ i ].end == offset )
867 {
868 //found
869
870 if ( i == int( m_Index.size() - 1 ) )
871 { //last block
872
873 m_Index[ i ].end += len;
874
875 if ( m_nFileLen != -1 )
876 { //have filelen,we can judge if data end
877
878 if ( m_Index[ i ].end > m_nFileLen )
879 {
880 OutMsg( _MGSTR( _S_FILEMGR_DATAOVERLOAD ), MSG_WARNNING );
881 m_Index[ i ].end = m_nFileLen;
882 combine = true;
883 }
884
885 return m_Index[ i ].end == m_nFileLen ? 0 : 1;
886 }
887 else //2006/10/02 add
888 { //no filelen,we can't judge if data end,just continue
889 return 1;
890 }
891 }
892 else if ( m_Index[ i + 1 ].start <= offset + len )
893 { //block merge with next block
894 //是否让本线程退出取决于数据是否超越了下一个块的末尾,如果超越了下个快的末尾
895 //那么应该让本线程继续,否则应该退出本线程,继续选择下一个下载点。
896 int nret = m_Index[ i + 1 ].end > offset + len ? 0 : 1;
897 m_Index[ i ].end += len;
898 combine = true;
899 MergeIndex();
900 return nret;
901 }
902 else
903 { //no merge with other,continue
904 m_Index[ i ].end += len;
905 return 1;
906 }
907 }
908 }
909
910 //not found the data head,maybe this block is overlapped by others,break.
911 combine = true;
912
913 return 0;
914
915 }
916
917 OutMsg( "open datafile error when write !!", MSG_ERROR );
918 return -1;
919 }
920
921
922 //2006/07/06修正一些细节
923 //这个函数会对任务结束做出一个判断
924 //tid is 1 based
925 //返回1成功,返回0失败
GetTask(int tid,llong & taskpos)926 bool CMgFileManager::GetTask( int tid, llong &taskpos )
927 {
928 //提取一个任务片的起点
929 //算法:先搜现有块的末尾,如果有空的分配出去,如果没空的,搜索一个间隔最大的空白区域,在中间插入起点。
930
931 if ( m_bStop )
932 {
933 OutMsg( "task canceled, no more work.", MSG_INFO );
934 return false;
935 }
936
937 if ( m_nFileLen == 0 ) {
938 OutMsg( "zero length task, no work" , MSG_INFO );
939 return false; //a zero length task
940 }
941
942 pthread_mutex_lock( &m_IndexMutex );
943 //寻找原来的点然后注销标记
944 vector<bindex>::iterator it;
945
946 for ( it = m_Index.begin();it != m_Index.end();it++ )
947 {
948 if ( it->mark == tid )
949 {
950 it->mark = 0;
951 break;
952 }
953 }
954
955 OrderIndex();
956 MergeIndex();
957
958 if ( m_Index.empty() )
959 {
960 AddNewIndex( tid, 0 );
961 taskpos = 0L;
962 goto quit1;
963 }
964 else if ( m_Index.size() == 1 )
965 {
966 if ( m_Index[ 0 ].start == 0 && m_Index[ 0 ].end == m_nFileLen )
967 { //in fact,task finish
968 goto quit0;
969 }
970 else if ( m_Index[ 0 ].start != 0 )
971 { //没从起点开始分配,加一个起点
972 AddNewIndex( tid, 0 );
973 taskpos = 0L;
974 goto quit1;
975 }
976 else
977 {
978 if ( !m_Index[ 0 ].mark )
979 { //唯一的索引还没分配出去
980 taskpos = m_Index[ 0 ].end;
981 m_Index[ 0 ].mark = tid ;
982 goto quit1;
983 }
984 else
985 { //唯一的索引已经分配出去了
986
987 if ( m_nFileLen - m_Index[ 0 ].end < RECVBUF )
988 { //to little data to split
989 goto quit0;
990 }
991 else
992 {
993 llong newpos = ( m_Index[ 0 ].end + m_nFileLen ) / 2 - 1;
994 AddNewIndex( tid, newpos );
995 taskpos = newpos;
996 goto quit1;
997 }
998 }
999 }
1000 }
1001 else
1002 { //item>1
1003 //looking for a index not delivered
1004
1005 vector<bindex>::iterator it;
1006
1007 for ( it = m_Index.begin();it != m_Index.end();it++ )
1008 {
1009
1010 if ( !it->mark && it->end < m_nFileLen )
1011 { //文件的末尾不要再分配线程下载08/07
1012 it->mark = tid ;
1013 taskpos = it->end;
1014 goto quit1;
1015 }
1016 }
1017
1018 //没有索引可用,找最大的空白插入
1019 llong maxpos = 0;
1020
1021 llong maxempty = 0;
1022
1023 if ( m_Index[ 0 ].start != 0 )
1024 { //起始空白
1025 maxpos = 0;
1026 maxempty = m_Index[ 0 ].start;
1027 }
1028
1029 if ( m_nFileLen - m_Index.back().end > maxempty )
1030 { //末尾空白
1031 maxpos = m_Index.back().end;
1032 maxempty = m_nFileLen - m_Index.back().end;
1033 }
1034
1035 //中间段空白
1036 for ( int i = 1;i < int( m_Index.size() );i++ )
1037 {
1038 if ( m_Index[ i ].start - m_Index[ i - 1 ].end > maxempty )
1039 {
1040 maxpos = m_Index[ i - 1 ].end;
1041 maxempty = m_Index[ i ].start - m_Index[ i - 1 ].end;
1042 }
1043 }
1044
1045 //找到了最大的空白。
1046 if ( maxempty < RECVBUF )
1047 {
1048 //WriteIndex();
1049 goto quit0;
1050 }
1051 else
1052 {
1053 taskpos = maxpos + maxempty / 2 - 1 ;
1054 AddNewIndex( tid, taskpos );
1055 goto quit1;
1056 }
1057 }
1058
1059 quit1:
1060 pthread_mutex_unlock( &m_IndexMutex );
1061 return true;
1062
1063 quit0:
1064 pthread_mutex_unlock( &m_IndexMutex );
1065 return false;
1066 }
1067
1068 //因为是内部函数,控制锁不在这里面加
1069 //添加索引必须要分配出去
1070 //tid is 1 based
AddNewIndex(int tid,llong pos)1071 void CMgFileManager::AddNewIndex( int tid, llong pos )
1072 {
1073 bindex bi;
1074 bi.start = bi.end = pos;
1075 //mark is 1 based!
1076 bi.mark = tid ;
1077 m_Index.push_back( bi );
1078 OrderIndex();
1079 }
1080
1081 //num should enough big
GetTaskIndex(bindex data[],int num)1082 int CMgFileManager::GetTaskIndex( bindex data[], int num )
1083 {
1084 pthread_mutex_lock( &m_IndexMutex );
1085 int size = m_Index.size();
1086
1087 if ( size > num )
1088 return 0;
1089
1090 for ( int i = 0;i < size ;i++ )
1091 data[ i ] = m_Index[ i ];
1092
1093 pthread_mutex_unlock( &m_IndexMutex );
1094
1095 return size;
1096 }
1097
UpdateRunningTable(CShowTableWindow * pwin)1098 void CMgFileManager::UpdateRunningTable( CShowTableWindow* pwin )
1099 {
1100
1101 struct bindex data[ 20 ];
1102 pthread_mutex_lock( &m_IndexMutex );
1103 int nsize = m_Index.size();
1104
1105 for ( int i = 0;i < nsize && i < 20;i++ )
1106 {
1107 data[ i ] = m_Index[ i ];
1108 }
1109
1110 pthread_mutex_unlock( &m_IndexMutex );
1111 pwin->SetRange( m_nFileLen, 0, nsize, data );
1112 m_pShowWin = pwin;
1113 m_bUpdateUI = true;
1114 }
1115
CloseUpdateUI()1116 void CMgFileManager::CloseUpdateUI()
1117 {
1118 m_bUpdateUI = false;
1119 m_pShowWin = NULL;
1120 }
1121
Stop()1122 void CMgFileManager::Stop()
1123 {
1124 m_bUpdateUI = false;
1125 m_pShowWin = NULL;
1126 m_bStop = true;
1127
1128 }
1129
OutMsg(string str,_MSGTYPE type)1130 void CMgFileManager::OutMsg( string str , _MSGTYPE type )
1131 {
1132 m_pParent->OutMsg( -2, str, type );
1133 }
1134
HeartBeat()1135 void CMgFileManager::HeartBeat()
1136 {
1137
1138 if ( m_ByteSeq.size() < 10 )
1139 {
1140 m_ByteSeq.push_back( m_nSumBytes );
1141 }
1142 else
1143 {
1144 m_ByteSeq.pop_front();
1145 m_ByteSeq.push_back( m_nSumBytes );
1146 }
1147
1148 int beats = m_ByteSeq.size();
1149 int diff = m_nSumBytes - m_ByteSeq.front();
1150 m_nSpeed = diff / beats;
1151
1152 if ( m_nSpeed != 0 && m_nFileLen != -1 && m_nFileLen != 0 )
1153 {
1154 m_nTimeToFinish = ( m_nFileLen - m_nSumBytes - m_nOriginBytes ) / m_nSpeed;
1155 }
1156 else
1157 {
1158 m_nTimeToFinish = -1;
1159 }
1160
1161 if ( m_nFileLen != -1 && m_nFileLen != 0 )
1162 {
1163 m_fRatio = float( m_nSumBytes + m_nOriginBytes ) / m_nFileLen;
1164 }
1165 }
1166
1167 //no lock for this func
SumBytes()1168 llong CMgFileManager::SumBytes()
1169 {
1170 vector<bindex>::const_iterator it;
1171
1172 llong sum = 0;
1173
1174 for ( it = m_Index.begin(); it != m_Index.end(); it++ )
1175 {
1176 sum += it->end - it->start;
1177 }
1178
1179 return sum;
1180 }
1181
GetRunningArgs(float & fratio,llong & finish,int & speed,int & ttf)1182 void CMgFileManager::GetRunningArgs( float& fratio, llong& finish, int& speed, int& ttf )
1183 {
1184 fratio = m_fRatio;
1185 finish = m_nSumBytes + m_nOriginBytes;
1186 speed = m_nSpeed;
1187 ttf = m_nTimeToFinish;
1188 }
1189
1190 //选取Url地址采用积分制,积分规则
1191 //1:某地址接收到一个数据包,积分+1
1192 //2:某个地址连接报告正常+500
1193 //3:某个地址连接报告失败-5000,无效-100000
1194 //4:主地址有20000个预设积分
1195 //5:没被使用的地址有10000预设积分,一旦使用了就清零
1196 //6:每次选择积分最高的地址返回
1197 //7:地址分配出去-500
1198 //8:跳转地址+100
1199
GetTask(int aid,std::string & fullurl,bool & urlneedcheck,llong & from,std::string & refer)1200 bool CMgFileManager::GetTask(
1201 int aid,
1202 std::string& fullurl,
1203 bool& urlneedcheck, //这个url是否需要检查正确性
1204 llong& from,
1205 std::string& refer
1206 )
1207 {
1208
1209 if ( !GetTask( aid, from ) )
1210 {
1211 OutMsg( _MGSTR( _S_FILEMGR_NOTASK ), MSG_INFO );
1212 return false;
1213 }
1214
1215 pthread_mutex_lock( &m_UListMutex );
1216
1217 int lsize = m_UrlList.size();
1218 int maxprior = -100000;
1219 int maxpos = -1;
1220
1221 for ( int i = 0;i < lsize;i++ )
1222 {
1223 if ( m_UrlList[ i ].prior > maxprior )
1224 {
1225 maxprior = m_UrlList[ i ].prior;
1226 maxpos = i;
1227 }
1228 }
1229
1230 if ( maxpos == -1 )
1231 {
1232 //list empty or all url invalid
1233 //return main url,for retry need a task url.
1234 //OutMsg("no heathy mirror url ,use main url",MSG_WARNNING);
1235 maxpos = 0;
1236 }
1237
1238 if ( !m_UrlList[ maxpos ].bused )
1239 {
1240 m_UrlList[ maxpos ].bused = true;
1241 m_UrlList[ maxpos ].prior = 0; //first use clear prior
1242 urlneedcheck = maxpos != 0 ? true : false; //first time need check if not main.
1243
1244 }
1245 else
1246 {
1247
1248 urlneedcheck = false; //not a first time use,don't check
1249 m_UrlList[ maxpos ].prior -= 500; //minus for use
1250 }
1251
1252
1253 fullurl = m_UrlList[ maxpos ].url;
1254 refer = m_UrlList[ maxpos ].refer;
1255 m_AntId2UrlIndex[ aid - 1 ] = maxpos;
1256
1257 memset( m_AntSpeed, 0, 10 * sizeof( int ) ); //clear speed table
1258 m_SumPack = 0;
1259
1260
1261 pthread_mutex_unlock( &m_UListMutex );
1262 return true;
1263
1264 }
1265
1266 //report url good or bad
ReportUrl(int adjust,std::string url)1267 void CMgFileManager::ReportUrl( int adjust, std::string url )
1268 {
1269 pthread_mutex_lock( &m_UListMutex );
1270 int lsize = m_UrlList.size();
1271
1272 for ( int i = 0; i < lsize; i++ )
1273 {
1274 if ( m_UrlList[ i ].url == url )
1275 {
1276 m_UrlList[ i ].prior += adjust; //normal +200,temp fail -5000,invalid -100000
1277
1278 if ( adjust > 0 )
1279 OutMsg( _MGSTR( _S_FILEMGR_URLOK ) + url, MSG_SUCCESS );
1280 else if ( adjust <= -100000 )
1281 OutMsg( _MGSTR( _S_FILEMGR_URLFAIL ) + url, MSG_ERROR );
1282 else
1283 OutMsg( _MGSTR( _S_FILEMGR_URLTEMPFAIL ) + url, MSG_WARNNING );
1284
1285 break;
1286 }
1287 }
1288
1289 pthread_mutex_unlock( &m_UListMutex );
1290 }
1291
ReportRedirect(std::string origin,std::string redirect)1292 void CMgFileManager::ReportRedirect( std::string origin, std::string redirect )
1293 {
1294 pthread_mutex_lock( &m_UListMutex );
1295 int lsize = m_UrlList.size();
1296
1297 for ( int i = 0;i < lsize;i++ )
1298 {
1299 if ( m_UrlList[ i ].url == origin )
1300 {
1301 m_UrlList[ i ].refer = m_UrlList[ i ].url;
1302 m_UrlList[ i ].url = redirect;
1303 m_UrlList[ i ].prior += 100; //an active url.
1304 OutMsg( origin + _MGSTR( _S_FILEMGR_REDIRECT ) + redirect, MSG_INFO );
1305 break;
1306 }
1307 }
1308
1309 pthread_mutex_unlock( &m_UListMutex );
1310 }
1311
1312 //must filter out the same
AddMirrorUrl(std::string url)1313 void CMgFileManager::AddMirrorUrl( std::string url )
1314 {
1315
1316 _ul tt;
1317 pthread_mutex_lock( &m_UListMutex );
1318
1319 tt.prior = 10000; //new url have high prior
1320 tt.url = url;
1321 tt.bused = false;
1322
1323 bool bExists = false;
1324
1325 //check if any same url exists
1326
1327 std::vector<_ul>::const_iterator it;
1328
1329 for ( it = m_UrlList.begin();it != m_UrlList.end();it++ )
1330 {
1331 if ( url == it->url )
1332 {
1333 bExists = true;
1334 break;
1335 }
1336 }
1337
1338 if ( !bExists )
1339 m_UrlList.push_back( tt );
1340
1341 pthread_mutex_unlock( &m_UListMutex );
1342 }
1343