1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL. If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so. If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact: Jari Sundell <jaris@ifi.uio.no>
33 //
34 // Skomakerveien 33
35 // 3185 Skoppum, NORWAY
36
37 #include "config.h"
38
39 #include <cstring>
40 #include <sstream>
41
42 #include "data/chunk_list_node.h"
43 #include "download/chunk_selector.h"
44 #include "download/chunk_statistics.h"
45 #include "download/download_main.h"
46 #include "torrent/dht_manager.h"
47 #include "torrent/download_info.h"
48 #include "torrent/download/choke_queue.h"
49 #include "torrent/peer/connection_list.h"
50 #include "torrent/peer/peer_info.h"
51 #include "rak/functional.h"
52 #include "torrent/utils/log.h"
53
54 #include "extensions.h"
55 #include "peer_connection_metadata.h"
56
57 #define LT_LOG_METADATA_EVENTS(log_fmt, ...) \
58 lt_log_print_info(LOG_PROTOCOL_METADATA_EVENTS, this->download()->info(), "metadata_events", "%40s " log_fmt, this->peer_info()->id_hex(), __VA_ARGS__);
59 #define LT_LOG_STORAGE_ERRORS(log_fmt, ...) \
60 lt_log_print_info(LOG_PROTOCOL_STORAGE_ERRORS, this->download()->info(), "storage_errors", "%40s " log_fmt, this->peer_info()->id_hex(), __VA_ARGS__);
61
62 namespace torrent {
63
~PeerConnectionMetadata()64 PeerConnectionMetadata::~PeerConnectionMetadata() {
65 }
66
67 void
initialize_custom()68 PeerConnectionMetadata::initialize_custom() {
69 }
70
71 void
update_interested()72 PeerConnectionMetadata::update_interested() {
73 }
74
75 bool
receive_keepalive()76 PeerConnectionMetadata::receive_keepalive() {
77 if (cachedTime - m_timeLastRead > rak::timer::from_seconds(240))
78 return false;
79
80 m_tryRequest = true;
81
82 // There's no point in adding ourselves to the write poll if the
83 // buffer is full, as that will already have been taken care of.
84 if (m_up->get_state() == ProtocolWrite::IDLE &&
85 m_up->can_write_keepalive()) {
86
87 write_insert_poll_safe();
88
89 ProtocolBuffer<512>::iterator old_end = m_up->buffer()->end();
90 m_up->write_keepalive();
91
92 if (is_encrypted())
93 m_encryption.encrypt(old_end, m_up->buffer()->end() - old_end);
94 }
95
96 return true;
97 }
98
99 // We keep the message in the buffer if it is incomplete instead of
100 // keeping the state and remembering the read information. This
101 // shouldn't happen very often compared to full reads.
102 inline bool
read_message()103 PeerConnectionMetadata::read_message() {
104 ProtocolBuffer<512>* buf = m_down->buffer();
105
106 if (buf->remaining() < 4)
107 return false;
108
109 // Remember the start of the message so we may reset it if we don't
110 // have the whole message.
111 ProtocolBuffer<512>::iterator beginning = buf->position();
112
113 uint32_t length = buf->read_32();
114
115 if (length == 0) {
116 // Keepalive message.
117 m_down->set_last_command(ProtocolBase::KEEP_ALIVE);
118
119 return true;
120
121 } else if (buf->remaining() < 1) {
122 buf->set_position_itr(beginning);
123 return false;
124
125 } else if (length > (1 << 20)) {
126 throw communication_error("PeerConnection::read_message() got an invalid message length.");
127 }
128
129 m_down->set_last_command((ProtocolBase::Protocol)buf->peek_8());
130
131 // Ignore most messages, they aren't relevant for a metadata download.
132 switch (buf->read_8()) {
133 case ProtocolBase::CHOKE:
134 case ProtocolBase::UNCHOKE:
135 case ProtocolBase::INTERESTED:
136 case ProtocolBase::NOT_INTERESTED:
137 return true;
138
139 case ProtocolBase::HAVE:
140 if (!m_down->can_read_have_body())
141 break;
142
143 buf->read_32();
144 return true;
145
146 case ProtocolBase::REQUEST:
147 if (!m_down->can_read_request_body())
148 break;
149
150 m_down->read_request();
151 return true;
152
153 case ProtocolBase::PIECE:
154 throw communication_error("Received a piece but the connection is strictly for meta data.");
155
156 case ProtocolBase::CANCEL:
157 if (!m_down->can_read_cancel_body())
158 break;
159
160 m_down->read_request();
161 return true;
162
163 case ProtocolBase::PORT:
164 if (!m_down->can_read_port_body())
165 break;
166
167 manager->dht_manager()->add_node(m_peerInfo->socket_address(), m_down->buffer()->read_16());
168 return true;
169
170 case ProtocolBase::EXTENSION_PROTOCOL:
171 LT_LOG_METADATA_EVENTS("protocol extension message", 0);
172
173 if (!m_down->can_read_extension_body())
174 break;
175
176 if (m_extensions->is_default()) {
177 m_extensions = new ProtocolExtension();
178 m_extensions->set_info(m_peerInfo, m_download);
179 }
180
181 {
182 int extension = m_down->buffer()->read_8();
183 m_extensions->read_start(extension, length - 2, (extension == ProtocolExtension::UT_PEX) && !m_download->want_pex_msg());
184 m_down->set_state(ProtocolRead::READ_EXTENSION);
185 }
186
187 if (!down_extension())
188 return false;
189
190 LT_LOG_METADATA_EVENTS("protocol extension done", 0);
191
192 // Drop peer if it disabled the metadata extension.
193 if (!m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
194 throw close_connection();
195
196 m_down->set_state(ProtocolRead::IDLE);
197 m_tryRequest = true;
198 write_insert_poll_safe();
199
200 return true;
201
202 case ProtocolBase::BITFIELD:
203 // Discard the bitfield sent by the peer.
204 m_skipLength = length - 1;
205 m_down->set_state(ProtocolRead::READ_SKIP_PIECE);
206 return false;
207
208 default:
209 throw communication_error("Received unsupported message type.");
210 }
211
212 // We were unsuccessfull in reading the message, need more data.
213 buf->set_position_itr(beginning);
214 return false;
215 }
216
217 void
event_read()218 PeerConnectionMetadata::event_read() {
219 m_timeLastRead = cachedTime;
220
221 // Need to make sure ProtocolBuffer::end() is pointing to the end of
222 // the unread data, and that the unread data starts from the
223 // beginning of the buffer. Or do we use position? Propably best,
224 // therefor ProtocolBuffer::position() points to the beginning of
225 // the unused data.
226
227 try {
228
229 // Normal read.
230 //
231 // We rarely will read zero bytes as the read of 64 bytes will
232 // almost always either not fill up or it will require additional
233 // reads.
234 //
235 // Only loop when end hits 64.
236
237 do {
238 switch (m_down->get_state()) {
239 case ProtocolRead::IDLE:
240 if (m_down->buffer()->size_end() < read_size) {
241 unsigned int length = read_stream_throws(m_down->buffer()->end(), read_size - m_down->buffer()->size_end());
242 m_down->throttle()->node_used_unthrottled(length);
243
244 if (is_encrypted())
245 m_encryption.decrypt(m_down->buffer()->end(), length);
246
247 m_down->buffer()->move_end(length);
248 }
249
250 while (read_message());
251
252 if (m_down->buffer()->size_end() == read_size) {
253 m_down->buffer()->move_unused();
254 break;
255 } else {
256 m_down->buffer()->move_unused();
257 return;
258 }
259
260 case ProtocolRead::READ_EXTENSION:
261 if (!down_extension())
262 return;
263
264 // Drop peer if it disabled the metadata extension.
265 if (!m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
266 throw close_connection();
267
268 LT_LOG_METADATA_EVENTS("reading extension message", 0);
269
270 m_down->set_state(ProtocolRead::IDLE);
271 m_tryRequest = true;
272 write_insert_poll_safe();
273 break;
274
275 // Actually skipping the bitfield.
276 // We never receive normal piece messages anyway.
277 case ProtocolRead::READ_SKIP_PIECE:
278 if (!read_skip_bitfield())
279 return;
280
281 m_down->set_state(ProtocolRead::IDLE);
282 break;
283
284 default:
285 throw internal_error("PeerConnection::event_read() wrong state.");
286 }
287
288 // Figure out how to get rid of the shouldLoop boolean.
289 } while (true);
290
291 // Exception handlers:
292
293 } catch (close_connection& e) {
294 m_download->connection_list()->erase(this, 0);
295
296 } catch (blocked_connection& e) {
297 m_download->connection_list()->erase(this, 0);
298
299 } catch (network_error& e) {
300 m_download->connection_list()->erase(this, 0);
301
302 } catch (storage_error& e) {
303 LT_LOG_STORAGE_ERRORS("read error: %s", e.what());
304 m_download->connection_list()->erase(this, 0);
305
306 } catch (base_error& e) {
307 std::stringstream s;
308 s << "Connection read fd(" << get_fd().get_fd() << ',' << m_down->get_state() << ',' << m_down->last_command() << ") \"" << e.what() << '"';
309
310 throw internal_error(s.str());
311 }
312 }
313
314 inline void
fill_write_buffer()315 PeerConnectionMetadata::fill_write_buffer() {
316 ProtocolBuffer<512>::iterator old_end = m_up->buffer()->end();
317
318 if (m_tryRequest)
319 m_tryRequest = try_request_metadata_pieces();
320
321 if (m_sendPEXMask && m_up->can_write_extension() &&
322 send_pex_message()) {
323 // Don't do anything else if send_pex_message() succeeded.
324
325 } else if (m_extensions->has_pending_message() && m_up->can_write_extension() &&
326 send_ext_message()) {
327 // Same.
328 }
329
330 if (is_encrypted())
331 m_encryption.encrypt(old_end, m_up->buffer()->end() - old_end);
332 }
333
334 void
event_write()335 PeerConnectionMetadata::event_write() {
336 try {
337
338 do {
339
340 switch (m_up->get_state()) {
341 case ProtocolWrite::IDLE:
342
343 fill_write_buffer();
344
345 if (m_up->buffer()->remaining() == 0) {
346 manager->poll()->remove_write(this);
347 return;
348 }
349
350 m_up->set_state(ProtocolWrite::MSG);
351
352 case ProtocolWrite::MSG:
353 if (!m_up->buffer()->consume(m_up->throttle()->node_used_unthrottled(write_stream_throws(m_up->buffer()->position(),
354 m_up->buffer()->remaining()))))
355 return;
356
357 m_up->buffer()->reset();
358
359 if (m_up->last_command() != ProtocolBase::EXTENSION_PROTOCOL) {
360 m_up->set_state(ProtocolWrite::IDLE);
361 break;
362 }
363
364 m_up->set_state(ProtocolWrite::WRITE_EXTENSION);
365
366 case ProtocolWrite::WRITE_EXTENSION:
367 if (!up_extension())
368 return;
369
370 m_up->set_state(ProtocolWrite::IDLE);
371 break;
372
373 default:
374 throw internal_error("PeerConnection::event_write() wrong state.");
375 }
376
377 } while (true);
378
379 } catch (close_connection& e) {
380 m_download->connection_list()->erase(this, 0);
381
382 } catch (blocked_connection& e) {
383 m_download->connection_list()->erase(this, 0);
384
385 } catch (network_error& e) {
386 m_download->connection_list()->erase(this, 0);
387
388 } catch (storage_error& e) {
389 LT_LOG_STORAGE_ERRORS("read error: %s", e.what());
390 m_download->connection_list()->erase(this, 0);
391
392 } catch (base_error& e) {
393 std::stringstream s;
394 s << "Connection write fd(" << get_fd().get_fd() << ',' << m_up->get_state() << ',' << m_up->last_command() << ") \"" << e.what() << '"';
395
396 throw internal_error(s.str());
397 }
398 }
399
400 bool
read_skip_bitfield()401 PeerConnectionMetadata::read_skip_bitfield() {
402 if (m_down->buffer()->remaining()) {
403 uint32_t length = std::min(m_skipLength, (uint32_t)m_down->buffer()->remaining());
404 m_down->buffer()->consume(length);
405 m_skipLength -= length;
406 }
407
408 if (m_skipLength) {
409 uint32_t length = std::min(m_skipLength, (uint32_t)null_buffer_size);
410 length = read_stream_throws(m_nullBuffer, length);
411 if (!length)
412 return false;
413 m_skipLength -= length;
414 }
415
416 return !m_skipLength;
417 }
418
419 // Same as the PCB code, but only one at a time and with the extension protocol.
420 bool
try_request_metadata_pieces()421 PeerConnectionMetadata::try_request_metadata_pieces() {
422 if (m_download->file_list()->chunk_size() == 1 || !m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
423 return false;
424
425 if (request_list()->queued_empty())
426 m_downStall = 0;
427
428 uint32_t pipeSize = request_list()->calculate_pipe_size(m_peerChunks.download_throttle()->rate()->rate());
429
430 // Don't start requesting if we can't do it in large enough chunks.
431 if (request_list()->pipe_size() >= (pipeSize + 10) / 2)
432 return false;
433
434 // DEBUG:
435 // if (!request_list()->queued_size() < pipeSize || !m_up->can_write_extension() ||
436 if (!m_up->can_write_extension() || m_extensions->has_pending_message())
437 return false;
438
439 const Piece* p = request_list()->delegate();
440
441 if (p == NULL)
442 return false;
443
444 if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
445 throw internal_error("PeerConnectionMetadata::try_request_metadata_pieces() tried to use an invalid piece.");
446
447 // DEBUG:
448 if (m_extensions->request_metadata_piece(p)) {
449 LT_LOG_METADATA_EVENTS("request metadata piece succeded", 0);
450 return true;
451 } else {
452 LT_LOG_METADATA_EVENTS("request metadata piece failed", 0);
453 return false;
454 }
455 }
456
457 void
receive_metadata_piece(uint32_t piece,const char * data,uint32_t length)458 PeerConnectionMetadata::receive_metadata_piece(uint32_t piece, const char* data, uint32_t length) {
459 if (data == NULL) {
460 // Length is not set in a reject message.
461 length = ProtocolExtension::metadata_piece_size;
462
463 if ((piece << ProtocolExtension::metadata_piece_shift) + ProtocolExtension::metadata_piece_size >= m_download->file_list()->size_bytes())
464 length = m_download->file_list()->chunk_size() % ProtocolExtension::metadata_piece_size;
465
466 m_tryRequest = false;
467 read_cancel_piece(Piece(0, piece << ProtocolExtension::metadata_piece_shift, length));
468
469 LT_LOG_METADATA_EVENTS("rejected metadata piece", 0);
470 return;
471 }
472
473 if (!down_chunk_start(Piece(0, piece << ProtocolExtension::metadata_piece_shift, length))) {
474 LT_LOG_METADATA_EVENTS("skipped metadata piece", 0);
475 down_chunk_skip_process(data, length);
476 } else {
477 LT_LOG_METADATA_EVENTS("processed metadata piece", 0);
478 down_chunk_process(data, length);
479 }
480
481 if (m_request_list.transfer() != NULL && !m_request_list.transfer()->is_finished())
482 throw internal_error("PeerConnectionMetadata::receive_metadata_piece did not have complete piece.");
483
484 m_tryRequest = true;
485 down_chunk_finished();
486 }
487
488 }
489