1 /* <!-- copyright */
2 /*
3 * aria2 - The high speed download utility
4 *
5 * Copyright (C) 2006 Tatsuhiro Tsujikawa
6 *
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License as published by
9 * the Free Software Foundation; either version 2 of the License, or
10 * (at your option) any later version.
11 *
12 * This program is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
16 *
17 * You should have received a copy of the GNU General Public License
18 * along with this program; if not, write to the Free Software
19 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
20 *
21 * In addition, as a special exception, the copyright holders give
22 * permission to link the code of portions of this program with the
23 * OpenSSL library under certain conditions as described in each
24 * individual source file, and distribute linked combinations
25 * including the two.
26 * You must obey the GNU General Public License in all respects
27 * for all of the code used other than OpenSSL. If you modify
28 * file(s) with this exception, you may extend this exception to your
29 * version of the file(s), but you are not obligated to do so. If you
30 * do not wish to do so, delete this exception statement from your
31 * version. If you delete this exception statement from all source
32 * files in the program, then also delete it here.
33 */
34 /* copyright --> */
35 #include "BtPieceMessage.h"
36
37 #include <cstring>
38 #include <cstdlib>
39 #include <cassert>
40
41 #include "bittorrent_helper.h"
42 #include "util.h"
43 #include "message.h"
44 #include "DlAbortEx.h"
45 #include "message_digest_helper.h"
46 #include "DiskAdaptor.h"
47 #include "Logger.h"
48 #include "LogFactory.h"
49 #include "Peer.h"
50 #include "Piece.h"
51 #include "PieceStorage.h"
52 #include "BtMessageDispatcher.h"
53 #include "BtMessageFactory.h"
54 #include "BtRequestFactory.h"
55 #include "PeerConnection.h"
56 #include "fmt.h"
57 #include "DownloadContext.h"
58 #include "PeerStorage.h"
59 #include "array_fun.h"
60 #include "WrDiskCache.h"
61 #include "WrDiskCacheEntry.h"
62 #include "DownloadFailureException.h"
63 #include "BtRejectMessage.h"
64
65 namespace aria2 {
66
67 const char BtPieceMessage::NAME[] = "piece";
68
BtPieceMessage(size_t index,int32_t begin,int32_t blockLength)69 BtPieceMessage::BtPieceMessage(size_t index, int32_t begin, int32_t blockLength)
70 : AbstractBtMessage(ID, NAME),
71 index_(index),
72 begin_(begin),
73 blockLength_(blockLength),
74 data_(nullptr),
75 downloadContext_(nullptr),
76 peerStorage_(nullptr)
77 {
78 setUploading(true);
79 }
80
81 BtPieceMessage::~BtPieceMessage() = default;
82
setMsgPayload(const unsigned char * data)83 void BtPieceMessage::setMsgPayload(const unsigned char* data) { data_ = data; }
84
85 std::unique_ptr<BtPieceMessage>
create(const unsigned char * data,size_t dataLength)86 BtPieceMessage::create(const unsigned char* data, size_t dataLength)
87 {
88 bittorrent::assertPayloadLengthGreater(9, dataLength, NAME);
89 bittorrent::assertID(ID, data, NAME);
90 return make_unique<BtPieceMessage>(bittorrent::getIntParam(data, 1),
91 bittorrent::getIntParam(data, 5),
92 dataLength - 9);
93 }
94
doReceivedAction()95 void BtPieceMessage::doReceivedAction()
96 {
97 if (isMetadataGetMode()) {
98 return;
99 }
100 auto slot = getBtMessageDispatcher()->getOutstandingRequest(index_, begin_,
101 blockLength_);
102 getPeer()->updateDownload(blockLength_);
103 downloadContext_->updateDownload(blockLength_);
104 if (slot) {
105 getPeer()->snubbing(false);
106 std::shared_ptr<Piece> piece = getPieceStorage()->getPiece(index_);
107 int64_t offset =
108 static_cast<int64_t>(index_) * downloadContext_->getPieceLength() +
109 begin_;
110 A2_LOG_DEBUG(fmt(MSG_PIECE_RECEIVED, getCuid(),
111 static_cast<unsigned long>(index_), begin_, blockLength_,
112 static_cast<int64_t>(offset),
113 static_cast<unsigned long>(slot->getBlockIndex())));
114 if (piece->hasBlock(slot->getBlockIndex())) {
115 A2_LOG_DEBUG("Already have this block.");
116 return;
117 }
118 if (piece->getWrDiskCacheEntry()) {
119 // Write Disk Cache enabled. Unfortunately, it incurs extra data
120 // copy.
121 auto dataCopy = new unsigned char[blockLength_];
122 memcpy(dataCopy, data_ + 9, blockLength_);
123 piece->updateWrCache(getPieceStorage()->getWrDiskCache(), dataCopy, 0,
124 blockLength_, blockLength_, offset);
125 }
126 else {
127 getPieceStorage()->getDiskAdaptor()->writeData(data_ + 9, blockLength_,
128 offset);
129 }
130 piece->completeBlock(slot->getBlockIndex());
131 A2_LOG_DEBUG(fmt(
132 MSG_PIECE_BITFIELD, getCuid(),
133 util::toHex(piece->getBitfield(), piece->getBitfieldLength()).c_str()));
134 piece->updateHash(begin_, data_ + 9, blockLength_);
135 getBtMessageDispatcher()->removeOutstandingRequest(slot);
136 if (piece->pieceComplete()) {
137 if (checkPieceHash(piece)) {
138 onNewPiece(piece);
139 }
140 else {
141 onWrongPiece(piece);
142 peerStorage_->addBadPeer(getPeer()->getIPAddress());
143 throw DL_ABORT_EX("Bad piece hash.");
144 }
145 }
146 }
147 else {
148 A2_LOG_DEBUG(fmt("CUID#%" PRId64
149 " - RequestSlot not found, index=%lu, begin=%d",
150 getCuid(), static_cast<unsigned long>(index_), begin_));
151 }
152 }
153
154 namespace {
155 constexpr size_t MESSAGE_HEADER_LENGTH = 13;
156 } // namespace
157
createMessageHeader(unsigned char * msgHeader) const158 void BtPieceMessage::createMessageHeader(unsigned char* msgHeader) const
159 {
160 /**
161 * len --- 9+blockLength, 4bytes
162 * id --- 7, 1byte
163 * index --- index, 4bytes
164 * begin --- begin, 4bytes
165 * total: 13bytes
166 */
167 bittorrent::createPeerMessageString(msgHeader, MESSAGE_HEADER_LENGTH,
168 9 + blockLength_, ID);
169 bittorrent::setIntParam(&msgHeader[5], index_);
170 bittorrent::setIntParam(&msgHeader[9], begin_);
171 }
172
getMessageHeaderLength()173 size_t BtPieceMessage::getMessageHeaderLength()
174 {
175 return MESSAGE_HEADER_LENGTH;
176 }
177
178 namespace {
179 struct PieceSendUpdate : public ProgressUpdate {
PieceSendUpdatearia2::__anonb2a7f1590211::PieceSendUpdate180 PieceSendUpdate(DownloadContext* dctx, std::shared_ptr<Peer> peer,
181 size_t headerLength)
182 : dctx(dctx), peer(std::move(peer)), headerLength(headerLength)
183 {
184 }
updatearia2::__anonb2a7f1590211::PieceSendUpdate185 virtual void update(size_t length, bool complete) CXX11_OVERRIDE
186 {
187 if (headerLength > 0) {
188 size_t m = std::min(headerLength, length);
189 headerLength -= m;
190 length -= m;
191 }
192 peer->updateUploadLength(length);
193 dctx->updateUploadLength(length);
194 }
195 DownloadContext* dctx;
196 std::shared_ptr<Peer> peer;
197 size_t headerLength;
198 };
199 } // namespace
200
send()201 void BtPieceMessage::send()
202 {
203 if (isInvalidate()) {
204 return;
205 }
206 A2_LOG_INFO(fmt(MSG_SEND_PEER_MESSAGE, getCuid(),
207 getPeer()->getIPAddress().c_str(), getPeer()->getPort(),
208 toString().c_str()));
209 int64_t pieceDataOffset =
210 static_cast<int64_t>(index_) * downloadContext_->getPieceLength() +
211 begin_;
212 pushPieceData(pieceDataOffset, blockLength_);
213 }
214
pushPieceData(int64_t offset,int32_t length) const215 void BtPieceMessage::pushPieceData(int64_t offset, int32_t length) const
216 {
217 assert(length <= static_cast<int32_t>(MAX_BLOCK_LENGTH));
218 auto buf = std::vector<unsigned char>(length + MESSAGE_HEADER_LENGTH);
219 createMessageHeader(buf.data());
220 ssize_t r;
221 r = getPieceStorage()->getDiskAdaptor()->readData(
222 buf.data() + MESSAGE_HEADER_LENGTH, length, offset);
223 if (r == length) {
224 const auto& peer = getPeer();
225 getPeerConnection()->pushBytes(
226 std::move(buf), make_unique<PieceSendUpdate>(downloadContext_, peer,
227 MESSAGE_HEADER_LENGTH));
228 peer->updateUploadSpeed(length);
229 downloadContext_->updateUploadSpeed(length);
230 }
231 else {
232 throw DL_ABORT_EX(EX_DATA_READ);
233 }
234 }
235
toString() const236 std::string BtPieceMessage::toString() const
237 {
238 return fmt("%s index=%lu, begin=%d, length=%d", NAME,
239 static_cast<unsigned long>(index_), begin_, blockLength_);
240 }
241
checkPieceHash(const std::shared_ptr<Piece> & piece)242 bool BtPieceMessage::checkPieceHash(const std::shared_ptr<Piece>& piece)
243 {
244 if (!getPieceStorage()->isEndGame() && piece->isHashCalculated()) {
245 A2_LOG_DEBUG(fmt("Hash is available!! index=%lu",
246 static_cast<unsigned long>(piece->getIndex())));
247 return piece->getDigest() ==
248 downloadContext_->getPieceHash(piece->getIndex());
249 }
250 else {
251 A2_LOG_DEBUG(fmt("Calculating hash index=%lu",
252 static_cast<unsigned long>(piece->getIndex())));
253 try {
254 return piece->getDigestWithWrCache(downloadContext_->getPieceLength(),
255 getPieceStorage()->getDiskAdaptor()) ==
256 downloadContext_->getPieceHash(piece->getIndex());
257 }
258 catch (RecoverableException& e) {
259 piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
260 throw;
261 }
262 }
263 }
264
onNewPiece(const std::shared_ptr<Piece> & piece)265 void BtPieceMessage::onNewPiece(const std::shared_ptr<Piece>& piece)
266 {
267 if (piece->getWrDiskCacheEntry()) {
268 // We flush cached data whenever an whole piece is retrieved.
269 piece->flushWrCache(getPieceStorage()->getWrDiskCache());
270 if (piece->getWrDiskCacheEntry()->getError() !=
271 WrDiskCacheEntry::CACHE_ERR_SUCCESS) {
272 piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
273 throw DOWNLOAD_FAILURE_EXCEPTION2(
274 fmt("Write disk cache flush failure index=%lu",
275 static_cast<unsigned long>(piece->getIndex())),
276 piece->getWrDiskCacheEntry()->getErrorCode());
277 }
278 }
279 A2_LOG_INFO(fmt(MSG_GOT_NEW_PIECE, getCuid(),
280 static_cast<unsigned long>(piece->getIndex())));
281 getPieceStorage()->completePiece(piece);
282 getPieceStorage()->advertisePiece(getCuid(), piece->getIndex(),
283 global::wallclock());
284 }
285
onWrongPiece(const std::shared_ptr<Piece> & piece)286 void BtPieceMessage::onWrongPiece(const std::shared_ptr<Piece>& piece)
287 {
288 A2_LOG_INFO(fmt(MSG_GOT_WRONG_PIECE, getCuid(),
289 static_cast<unsigned long>(piece->getIndex())));
290 piece->clearAllBlock(getPieceStorage()->getWrDiskCache());
291 piece->destroyHashContext();
292 getBtRequestFactory()->removeTargetPiece(piece);
293 }
294
onChokingEvent(const BtChokingEvent & event)295 void BtPieceMessage::onChokingEvent(const BtChokingEvent& event)
296 {
297 if (!isInvalidate() && !getPeer()->isInAmAllowedIndexSet(index_)) {
298 A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CHOKED, getCuid(),
299 static_cast<unsigned long>(index_), begin_, blockLength_));
300 if (getPeer()->isFastExtensionEnabled()) {
301 getBtMessageDispatcher()->addMessageToQueue(
302 getBtMessageFactory()->createRejectMessage(index_, begin_,
303 blockLength_));
304 }
305 setInvalidate(true);
306 }
307 }
308
onCancelSendingPieceEvent(const BtCancelSendingPieceEvent & event)309 void BtPieceMessage::onCancelSendingPieceEvent(
310 const BtCancelSendingPieceEvent& event)
311 {
312 if (!isInvalidate() && index_ == event.getIndex() &&
313 begin_ == event.getBegin() && blockLength_ == event.getLength()) {
314 A2_LOG_DEBUG(fmt(MSG_REJECT_PIECE_CANCEL, getCuid(),
315 static_cast<unsigned long>(index_), begin_, blockLength_));
316 if (getPeer()->isFastExtensionEnabled()) {
317 getBtMessageDispatcher()->addMessageToQueue(
318 getBtMessageFactory()->createRejectMessage(index_, begin_,
319 blockLength_));
320 }
321 setInvalidate(true);
322 }
323 }
324
setDownloadContext(DownloadContext * downloadContext)325 void BtPieceMessage::setDownloadContext(DownloadContext* downloadContext)
326 {
327 downloadContext_ = downloadContext;
328 }
329
setPeerStorage(PeerStorage * peerStorage)330 void BtPieceMessage::setPeerStorage(PeerStorage* peerStorage)
331 {
332 peerStorage_ = peerStorage;
333 }
334
335 } // namespace aria2
336