1 /* <!-- copyright */
2 /*
3  * aria2 - The high speed download utility
4  *
5  * Copyright (C) 2006 Tatsuhiro Tsujikawa
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20  *
21  * In addition, as a special exception, the copyright holders give
22  * permission to link the code of portions of this program with the
23  * OpenSSL library under certain conditions as described in each
24  * individual source file, and distribute linked combinations
25  * including the two.
26  * You must obey the GNU General Public License in all respects
27  * for all of the code used other than OpenSSL.  If you modify
28  * file(s) with this exception, you may extend this exception to your
29  * version of the file(s), but you are not obligated to do so.  If you
30  * do not wish to do so, delete this exception statement from your
31  * version.  If you delete this exception statement from all source
32  * files in the program, then also delete it here.
33  */
34 /* copyright --> */
35 #include "BtPieceMessage.h"
36 
37 #include <cstring>
38 #include <cstdlib>
39 #include <cassert>
40 
41 #include "bittorrent_helper.h"
42 #include "util.h"
43 #include "message.h"
44 #include "DlAbortEx.h"
45 #include "message_digest_helper.h"
46 #include "DiskAdaptor.h"
47 #include "Logger.h"
48 #include "LogFactory.h"
49 #include "Peer.h"
50 #include "Piece.h"
51 #include "PieceStorage.h"
52 #include "BtMessageDispatcher.h"
53 #include "BtMessageFactory.h"
54 #include "BtRequestFactory.h"
55 #include "PeerConnection.h"
56 #include "fmt.h"
57 #include "DownloadContext.h"
58 #include "PeerStorage.h"
59 #include "array_fun.h"
60 #include "WrDiskCache.h"
61 #include "WrDiskCacheEntry.h"
62 #include "DownloadFailureException.h"
63 #include "BtRejectMessage.h"
64 
65 namespace aria2 {
66 
67 const char BtPieceMessage::NAME[] = "piece";
68 
BtPieceMessage(size_t index,int32_t begin,int32_t blockLength)69 BtPieceMessage::BtPieceMessage(size_t index, int32_t begin, int32_t blockLength)
70     : AbstractBtMessage(ID, NAME),
71       index_(index),
72       begin_(begin),
73       blockLength_(blockLength),
74       data_(nullptr),
75       downloadContext_(nullptr),
76       peerStorage_(nullptr)
77 {
78   setUploading(true);
79 }
80 
81 BtPieceMessage::~BtPieceMessage() = default;
82 
setMsgPayload(const unsigned char * data)83 void BtPieceMessage::setMsgPayload(const unsigned char* data) { data_ = data; }
84 
85 std::unique_ptr<BtPieceMessage>
create(const unsigned char * data,size_t dataLength)86 BtPieceMessage::create(const unsigned char* data, size_t dataLength)
87 {
88   bittorrent::assertPayloadLengthGreater(9, dataLength, NAME);
89   bittorrent::assertID(ID, data, NAME);
90   return make_unique<BtPieceMessage>(bittorrent::getIntParam(data, 1),
91                                      bittorrent::getIntParam(data, 5),
92                                      dataLength - 9);
93 }
94 
doReceivedAction()95 void BtPieceMessage::doReceivedAction()
96 {
97   if (isMetadataGetMode()) {
98     return;
99   }
100   auto slot = getBtMessageDispatcher()->getOutstandingRequest(index_, begin_,
101                                                               blockLength_);
102   getPeer()->updateDownload(blockLength_);
103   downloadContext_->updateDownload(blockLength_);
104   if (slot) {
105     getPeer()->snubbing(false);
106     std::shared_ptr<Piece> piece = getPieceStorage()->getPiece(index_);
107     int64_t offset =
108         static_cast<int64_t>(index_) * downloadContext_->getPieceLength() +
109         begin_;
110     A2_LOG_DEBUG(fmt(MSG_PIECE_RECEIVED, getCuid(),
111                      static_cast<unsigned long>(index_), begin_, blockLength_,
112                      static_cast<int64_t>(offset),
113                      static_cast<unsigned long>(slot->getBlockIndex())));
114     if (piece->hasBlock(slot->getBlockIndex())) {
115       A2_LOG_DEBUG("Already have this block.");
116       return;
117     }
118     if (piece->getWrDiskCacheEntry()) {
119       // Write Disk Cache enabled. Unfortunately, it incurs extra data
120       // copy.
121       auto dataCopy = new unsigned char[blockLength_];
122       memcpy(dataCopy, data_ + 9, blockLength_);
123       piece->updateWrCache(getPieceStorage()->getWrDiskCache(), dataCopy, 0,
124                            blockLength_, blockLength_, offset);
125     }
126     else {
127       getPieceStorage()->getDiskAdaptor()->writeData(data_ + 9, blockLength_,
128                                                      offset);
129     }
130     piece->completeBlock(slot->getBlockIndex());
131     A2_LOG_DEBUG(fmt(
132         MSG_PIECE_BITFIELD, getCuid(),
133         util::toHex(piece->getBitfield(), piece->getBitfieldLength()).c_str()));
134     piece->updateHash(begin_, data_ + 9, blockLength_);
135     getBtMessageDispatcher()->removeOutstandingRequest(slot);
136     if (piece->pieceComplete()) {
137       if (checkPieceHash(piece)) {
138         onNewPiece(piece);
139       }
140       else {
141         onWrongPiece(piece);
142         peerStorage_->addBadPeer(getPeer()->getIPAddress());
143         throw DL_ABORT_EX("Bad piece hash.");
144       }
145     }
146   }
147   else {
148     A2_LOG_DEBUG(fmt("CUID#%" PRId64
149                      " - RequestSlot not found, index=%lu, begin=%d",
150                      getCuid(), static_cast<unsigned long>(index_), begin_));
151   }
152 }
153 
154 namespace {
155 constexpr size_t MESSAGE_HEADER_LENGTH = 13;
156 } // namespace
157 
createMessageHeader(unsigned char * msgHeader) const158 void BtPieceMessage::createMessageHeader(unsigned char* msgHeader) const
159 {
160   /**
161    * len --- 9+blockLength, 4bytes
162    * id --- 7, 1byte
163    * index --- index, 4bytes
164    * begin --- begin, 4bytes
165    * total: 13bytes
166    */
167   bittorrent::createPeerMessageString(msgHeader, MESSAGE_HEADER_LENGTH,
168                                       9 + blockLength_, ID);
169   bittorrent::setIntParam(&msgHeader[5], index_);
170   bittorrent::setIntParam(&msgHeader[9], begin_);
171 }
172 
getMessageHeaderLength()173 size_t BtPieceMessage::getMessageHeaderLength()
174 {
175   return MESSAGE_HEADER_LENGTH;
176 }
177 
178 namespace {
179 struct PieceSendUpdate : public ProgressUpdate {
PieceSendUpdatearia2::__anonb2a7f1590211::PieceSendUpdate180   PieceSendUpdate(DownloadContext* dctx, std::shared_ptr<Peer> peer,
181                   size_t headerLength)
182       : dctx(dctx), peer(std::move(peer)), headerLength(headerLength)
183   {
184   }
updatearia2::__anonb2a7f1590211::PieceSendUpdate185   virtual void update(size_t length, bool complete) CXX11_OVERRIDE
186   {
187     if (headerLength > 0) {
188       size_t m = std::min(headerLength, length);
189       headerLength -= m;
190       length -= m;
191     }
192     peer->updateUploadLength(length);
193     dctx->updateUploadLength(length);
194   }
195   DownloadContext* dctx;
196   std::shared_ptr<Peer> peer;
197   size_t headerLength;
198 };
199 } // namespace
200 
send()201 void BtPieceMessage::send()
202 {
203   if (isInvalidate()) {
204     return;
205   }
206   A2_LOG_INFO(fmt(MSG_SEND_PEER_MESSAGE, getCuid(),
207                   getPeer()->getIPAddress().c_str(), getPeer()->getPort(),
208                   toString().c_str()));
209   int64_t pieceDataOffset =
210       static_cast<int64_t>(index_) * downloadContext_->getPieceLength() +
211       begin_;
212   pushPieceData(pieceDataOffset, blockLength_);
213 }
214 
pushPieceData(int64_t offset,int32_t length) const215 void BtPieceMessage::pushPieceData(int64_t offset, int32_t length) const
216 {
217   assert(length <= static_cast<int32_t>(MAX_BLOCK_LENGTH));
218   auto buf = std::vector<unsigned char>(length + MESSAGE_HEADER_LENGTH);
219   createMessageHeader(buf.data());
220   ssize_t r;
221   r = getPieceStorage()->getDiskAdaptor()->readData(
222       buf.data() + MESSAGE_HEADER_LENGTH, length, offset);
223   if (r == length) {
224     const auto& peer = getPeer();
225     getPeerConnection()->pushBytes(
226         std::move(buf), make_unique<PieceSendUpdate>(downloadContext_, peer,
227                                                      MESSAGE_HEADER_LENGTH));
228     peer->updateUploadSpeed(length);
229     downloadContext_->updateUploadSpeed(length);
230   }
231   else {
232     throw DL_ABORT_EX(EX_DATA_READ);
233   }
234 }
235 
toString() const236 std::string BtPieceMessage::toString() const
237 {
238   return fmt("%s index=%lu, begin=%d, length=%d", NAME,
239              static_cast<unsigned long>(index_), begin_, blockLength_);
240 }
241 
checkPieceHash(const std::shared_ptr<Piece> & piece)242 bool BtPieceMessage::checkPieceHash(const std::shared_ptr<Piece>& piece)
243 {
244   if (!getPieceStorage()->isEndGame() && piece->isHashCalculated()) {
245     A2_LOG_DEBUG(fmt("Hash is available!! index=%lu",
246                      static_cast<unsigned long>(piece->getIndex())));
247     return piece->getDigest() ==
248            downloadContext_->getPieceHash(piece->getIndex());
249   }
250   else {
251     A2_LOG_DEBUG(fmt("Calculating hash index=%lu",
252                      static_cast<unsigned long>(piece->getIndex())));
253     try {
254       return piece->getDigestWithWrCache(downloadContext_->getPieceLength(),
255                                          getPieceStorage()->getDiskAdaptor()) ==
256              downloadContext_->getPieceHash(piece->getIndex());
257     }
258     catch (RecoverableException& e) {
259       piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
260       throw;
261     }
262   }
263 }
264 
onNewPiece(const std::shared_ptr<Piece> & piece)265 void BtPieceMessage::onNewPiece(const std::shared_ptr<Piece>& piece)
266 {
267   if (piece->getWrDiskCacheEntry()) {
268     // We flush cached data whenever an whole piece is retrieved.
269     piece->flushWrCache(getPieceStorage()->getWrDiskCache());
270     if (piece->getWrDiskCacheEntry()->getError() !=
271         WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
272       piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
273       throw DOWNLOAD_FAILURE_EXCEPTION2(
274           fmt("Write disk cache flush failure index=%lu",
275               static_cast<unsigned long>(piece->getIndex())),
276           piece->getWrDiskCacheEntry()->getErrorCode());
277     }
278   }
279   A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(),
280                   static_cast<unsigned long>(piece->getIndex())));
281   getPieceStorage()->completePiece(piece);
282   getPieceStorage()->advertisePiece(getCuid(), piece->getIndex(),
283                                     global::wallclock());
284 }
285 
onWrongPiece(const std::shared_ptr<Piece> & piece)286 void BtPieceMessage::onWrongPiece(const std::shared_ptr<Piece>& piece)
287 {
288   A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE, getCuid(),
289                   static_cast<unsigned long>(piece->getIndex())));
290   piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
291   piece->destroyHashContext();
292   getBtRequestFactory()->removeTargetPiece(piece);
293 }
294 
onChokingEvent(const BtChokingEvent & event)295 void BtPieceMessage::onChokingEvent(const BtChokingEvent& event)
296 {
297   if (!isInvalidate() && !getPeer()->isInAmAllowedIndexSet(index_)) {
298     A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CHOKED, getCuid(),
299                      static_cast<unsigned long>(index_), begin_, blockLength_));
300     if (getPeer()->isFastExtensionEnabled()) {
301       getBtMessageDispatcher()->addMessageToQueue(
302           getBtMessageFactory()->createRejectMessage(index_, begin_,
303                                                      blockLength_));
304     }
305     setInvalidate(true);
306   }
307 }
308 
onCancelSendingPieceEvent(const BtCancelSendingPieceEvent & event)309 void BtPieceMessage::onCancelSendingPieceEvent(
310     const BtCancelSendingPieceEvent& event)
311 {
312   if (!isInvalidate() && index_ == event.getIndex() &&
313       begin_ == event.getBegin() && blockLength_ == event.getLength()) {
314     A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CANCEL, getCuid(),
315                      static_cast<unsigned long>(index_), begin_, blockLength_));
316     if (getPeer()->isFastExtensionEnabled()) {
317       getBtMessageDispatcher()->addMessageToQueue(
318           getBtMessageFactory()->createRejectMessage(index_, begin_,
319                                                      blockLength_));
320     }
321     setInvalidate(true);
322   }
323 }
324 
setDownloadContext(DownloadContext * downloadContext)325 void BtPieceMessage::setDownloadContext(DownloadContext* downloadContext)
326 {
327   downloadContext_ = downloadContext;
328 }
329 
setPeerStorage(PeerStorage * peerStorage)330 void BtPieceMessage::setPeerStorage(PeerStorage* peerStorage)
331 {
332   peerStorage_ = peerStorage;
333 }
334 
335 } // namespace aria2
336