1 // libTorrent - BitTorrent library
2 // Copyright (C) 2005-2011, Jari Sundell
3 //
4 // This program is free software; you can redistribute it and/or modify
5 // it under the terms of the GNU General Public License as published by
6 // the Free Software Foundation; either version 2 of the License, or
7 // (at your option) any later version.
8 //
9 // This program is distributed in the hope that it will be useful,
10 // but WITHOUT ANY WARRANTY; without even the implied warranty of
11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12 // GNU General Public License for more details.
13 //
14 // You should have received a copy of the GNU General Public License
15 // along with this program; if not, write to the Free Software
16 // Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
17 //
18 // In addition, as a special exception, the copyright holders give
19 // permission to link the code of portions of this program with the
20 // OpenSSL library under certain conditions as described in each
21 // individual source file, and distribute linked combinations
22 // including the two.
23 //
24 // You must obey the GNU General Public License in all respects for
25 // all of the code used other than OpenSSL.  If you modify file(s)
26 // with this exception, you may extend this exception to your version
27 // of the file(s), but you are not obligated to do so.  If you do not
28 // wish to do so, delete this exception statement from your version.
29 // If you delete this exception statement from all source files in the
30 // program, then also delete it here.
31 //
32 // Contact:  Jari Sundell <jaris@ifi.uio.no>
33 //
34 //           Skomakerveien 33
35 //           3185 Skoppum, NORWAY
36 
37 #include "config.h"
38 
39 #include <cstring>
40 #include <sstream>
41 
42 #include "data/chunk_list_node.h"
43 #include "download/chunk_selector.h"
44 #include "download/chunk_statistics.h"
45 #include "download/download_main.h"
46 #include "torrent/dht_manager.h"
47 #include "torrent/download_info.h"
48 #include "torrent/download/choke_queue.h"
49 #include "torrent/peer/connection_list.h"
50 #include "torrent/peer/peer_info.h"
51 #include "rak/functional.h"
52 #include "torrent/utils/log.h"
53 
54 #include "extensions.h"
55 #include "peer_connection_metadata.h"
56 
57 #define LT_LOG_METADATA_EVENTS(log_fmt, ...)                            \
58   lt_log_print_info(LOG_PROTOCOL_METADATA_EVENTS, this->download()->info(), "metadata_events", "%40s " log_fmt, this->peer_info()->id_hex(), __VA_ARGS__);
59 #define LT_LOG_STORAGE_ERRORS(log_fmt, ...)                              \
60   lt_log_print_info(LOG_PROTOCOL_STORAGE_ERRORS, this->download()->info(), "storage_errors", "%40s " log_fmt, this->peer_info()->id_hex(), __VA_ARGS__);
61 
62 namespace torrent {
63 
~PeerConnectionMetadata()64 PeerConnectionMetadata::~PeerConnectionMetadata() {
65 }
66 
67 void
initialize_custom()68 PeerConnectionMetadata::initialize_custom() {
69 }
70 
71 void
update_interested()72 PeerConnectionMetadata::update_interested() {
73 }
74 
75 bool
receive_keepalive()76 PeerConnectionMetadata::receive_keepalive() {
77   if (cachedTime - m_timeLastRead > rak::timer::from_seconds(240))
78     return false;
79 
80   m_tryRequest = true;
81 
82   // There's no point in adding ourselves to the write poll if the
83   // buffer is full, as that will already have been taken care of.
84   if (m_up->get_state() == ProtocolWrite::IDLE &&
85       m_up->can_write_keepalive()) {
86 
87     write_insert_poll_safe();
88 
89     ProtocolBuffer<512>::iterator old_end = m_up->buffer()->end();
90     m_up->write_keepalive();
91 
92     if (is_encrypted())
93       m_encryption.encrypt(old_end, m_up->buffer()->end() - old_end);
94   }
95 
96   return true;
97 }
98 
99 // We keep the message in the buffer if it is incomplete instead of
100 // keeping the state and remembering the read information. This
101 // shouldn't happen very often compared to full reads.
102 inline bool
read_message()103 PeerConnectionMetadata::read_message() {
104   ProtocolBuffer<512>* buf = m_down->buffer();
105 
106   if (buf->remaining() < 4)
107     return false;
108 
109   // Remember the start of the message so we may reset it if we don't
110   // have the whole message.
111   ProtocolBuffer<512>::iterator beginning = buf->position();
112 
113   uint32_t length = buf->read_32();
114 
115   if (length == 0) {
116     // Keepalive message.
117     m_down->set_last_command(ProtocolBase::KEEP_ALIVE);
118 
119     return true;
120 
121   } else if (buf->remaining() < 1) {
122     buf->set_position_itr(beginning);
123     return false;
124 
125   } else if (length > (1 << 20)) {
126     throw communication_error("PeerConnection::read_message() got an invalid message length.");
127   }
128 
129   m_down->set_last_command((ProtocolBase::Protocol)buf->peek_8());
130 
131   // Ignore most messages, they aren't relevant for a metadata download.
132   switch (buf->read_8()) {
133   case ProtocolBase::CHOKE:
134   case ProtocolBase::UNCHOKE:
135   case ProtocolBase::INTERESTED:
136   case ProtocolBase::NOT_INTERESTED:
137     return true;
138 
139   case ProtocolBase::HAVE:
140     if (!m_down->can_read_have_body())
141       break;
142 
143     buf->read_32();
144     return true;
145 
146   case ProtocolBase::REQUEST:
147     if (!m_down->can_read_request_body())
148       break;
149 
150     m_down->read_request();
151     return true;
152 
153   case ProtocolBase::PIECE:
154     throw communication_error("Received a piece but the connection is strictly for meta data.");
155 
156   case ProtocolBase::CANCEL:
157     if (!m_down->can_read_cancel_body())
158       break;
159 
160     m_down->read_request();
161     return true;
162 
163   case ProtocolBase::PORT:
164     if (!m_down->can_read_port_body())
165       break;
166 
167     manager->dht_manager()->add_node(m_peerInfo->socket_address(), m_down->buffer()->read_16());
168     return true;
169 
170   case ProtocolBase::EXTENSION_PROTOCOL:
171     LT_LOG_METADATA_EVENTS("protocol extension message", 0);
172 
173     if (!m_down->can_read_extension_body())
174       break;
175 
176     if (m_extensions->is_default()) {
177       m_extensions = new ProtocolExtension();
178       m_extensions->set_info(m_peerInfo, m_download);
179     }
180 
181     {
182       int extension = m_down->buffer()->read_8();
183       m_extensions->read_start(extension, length - 2, (extension == ProtocolExtension::UT_PEX) && !m_download->want_pex_msg());
184       m_down->set_state(ProtocolRead::READ_EXTENSION);
185     }
186 
187     if (!down_extension())
188       return false;
189 
190     LT_LOG_METADATA_EVENTS("protocol extension done", 0);
191 
192     // Drop peer if it disabled the metadata extension.
193     if (!m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
194       throw close_connection();
195 
196     m_down->set_state(ProtocolRead::IDLE);
197     m_tryRequest = true;
198     write_insert_poll_safe();
199 
200     return true;
201 
202   case ProtocolBase::BITFIELD:
203     // Discard the bitfield sent by the peer.
204     m_skipLength = length - 1;
205     m_down->set_state(ProtocolRead::READ_SKIP_PIECE);
206     return false;
207 
208   default:
209     throw communication_error("Received unsupported message type.");
210   }
211 
212   // We were unsuccessfull in reading the message, need more data.
213   buf->set_position_itr(beginning);
214   return false;
215 }
216 
217 void
event_read()218 PeerConnectionMetadata::event_read() {
219   m_timeLastRead = cachedTime;
220 
221   // Need to make sure ProtocolBuffer::end() is pointing to the end of
222   // the unread data, and that the unread data starts from the
223   // beginning of the buffer. Or do we use position? Propably best,
224   // therefor ProtocolBuffer::position() points to the beginning of
225   // the unused data.
226 
227   try {
228 
229     // Normal read.
230     //
231     // We rarely will read zero bytes as the read of 64 bytes will
232     // almost always either not fill up or it will require additional
233     // reads.
234     //
235     // Only loop when end hits 64.
236 
237     do {
238       switch (m_down->get_state()) {
239       case ProtocolRead::IDLE:
240         if (m_down->buffer()->size_end() < read_size) {
241           unsigned int length = read_stream_throws(m_down->buffer()->end(), read_size - m_down->buffer()->size_end());
242           m_down->throttle()->node_used_unthrottled(length);
243 
244           if (is_encrypted())
245             m_encryption.decrypt(m_down->buffer()->end(), length);
246 
247           m_down->buffer()->move_end(length);
248         }
249 
250         while (read_message());
251 
252         if (m_down->buffer()->size_end() == read_size) {
253           m_down->buffer()->move_unused();
254           break;
255         } else {
256           m_down->buffer()->move_unused();
257           return;
258         }
259 
260       case ProtocolRead::READ_EXTENSION:
261         if (!down_extension())
262           return;
263 
264         // Drop peer if it disabled the metadata extension.
265         if (!m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
266           throw close_connection();
267 
268         LT_LOG_METADATA_EVENTS("reading extension message", 0);
269 
270         m_down->set_state(ProtocolRead::IDLE);
271         m_tryRequest = true;
272         write_insert_poll_safe();
273         break;
274 
275       // Actually skipping the bitfield.
276       // We never receive normal piece messages anyway.
277       case ProtocolRead::READ_SKIP_PIECE:
278         if (!read_skip_bitfield())
279           return;
280 
281         m_down->set_state(ProtocolRead::IDLE);
282         break;
283 
284       default:
285         throw internal_error("PeerConnection::event_read() wrong state.");
286       }
287 
288       // Figure out how to get rid of the shouldLoop boolean.
289     } while (true);
290 
291   // Exception handlers:
292 
293   } catch (close_connection& e) {
294     m_download->connection_list()->erase(this, 0);
295 
296   } catch (blocked_connection& e) {
297     m_download->connection_list()->erase(this, 0);
298 
299   } catch (network_error& e) {
300     m_download->connection_list()->erase(this, 0);
301 
302   } catch (storage_error& e) {
303     LT_LOG_STORAGE_ERRORS("read error: %s", e.what());
304     m_download->connection_list()->erase(this, 0);
305 
306   } catch (base_error& e) {
307     std::stringstream s;
308     s << "Connection read fd(" << get_fd().get_fd() << ',' << m_down->get_state() << ',' << m_down->last_command() << ") \"" << e.what() << '"';
309 
310     throw internal_error(s.str());
311   }
312 }
313 
314 inline void
fill_write_buffer()315 PeerConnectionMetadata::fill_write_buffer() {
316   ProtocolBuffer<512>::iterator old_end = m_up->buffer()->end();
317 
318   if (m_tryRequest)
319     m_tryRequest = try_request_metadata_pieces();
320 
321   if (m_sendPEXMask && m_up->can_write_extension() &&
322       send_pex_message()) {
323     // Don't do anything else if send_pex_message() succeeded.
324 
325   } else if (m_extensions->has_pending_message() && m_up->can_write_extension() &&
326              send_ext_message()) {
327     // Same.
328   }
329 
330   if (is_encrypted())
331     m_encryption.encrypt(old_end, m_up->buffer()->end() - old_end);
332 }
333 
334 void
event_write()335 PeerConnectionMetadata::event_write() {
336   try {
337 
338     do {
339 
340       switch (m_up->get_state()) {
341       case ProtocolWrite::IDLE:
342 
343         fill_write_buffer();
344 
345         if (m_up->buffer()->remaining() == 0) {
346           manager->poll()->remove_write(this);
347           return;
348         }
349 
350         m_up->set_state(ProtocolWrite::MSG);
351 
352       case ProtocolWrite::MSG:
353         if (!m_up->buffer()->consume(m_up->throttle()->node_used_unthrottled(write_stream_throws(m_up->buffer()->position(),
354                                                                                                  m_up->buffer()->remaining()))))
355           return;
356 
357         m_up->buffer()->reset();
358 
359         if (m_up->last_command() != ProtocolBase::EXTENSION_PROTOCOL) {
360           m_up->set_state(ProtocolWrite::IDLE);
361           break;
362         }
363 
364         m_up->set_state(ProtocolWrite::WRITE_EXTENSION);
365 
366       case ProtocolWrite::WRITE_EXTENSION:
367         if (!up_extension())
368           return;
369 
370         m_up->set_state(ProtocolWrite::IDLE);
371         break;
372 
373       default:
374         throw internal_error("PeerConnection::event_write() wrong state.");
375       }
376 
377     } while (true);
378 
379   } catch (close_connection& e) {
380     m_download->connection_list()->erase(this, 0);
381 
382   } catch (blocked_connection& e) {
383     m_download->connection_list()->erase(this, 0);
384 
385   } catch (network_error& e) {
386     m_download->connection_list()->erase(this, 0);
387 
388   } catch (storage_error& e) {
389     LT_LOG_STORAGE_ERRORS("read error: %s", e.what());
390     m_download->connection_list()->erase(this, 0);
391 
392   } catch (base_error& e) {
393     std::stringstream s;
394     s << "Connection write fd(" << get_fd().get_fd() << ',' << m_up->get_state() << ',' << m_up->last_command() << ") \"" << e.what() << '"';
395 
396     throw internal_error(s.str());
397   }
398 }
399 
400 bool
read_skip_bitfield()401 PeerConnectionMetadata::read_skip_bitfield() {
402   if (m_down->buffer()->remaining()) {
403     uint32_t length = std::min(m_skipLength, (uint32_t)m_down->buffer()->remaining());
404     m_down->buffer()->consume(length);
405     m_skipLength -= length;
406   }
407 
408   if (m_skipLength) {
409     uint32_t length = std::min(m_skipLength, (uint32_t)null_buffer_size);
410     length = read_stream_throws(m_nullBuffer, length);
411     if (!length)
412       return false;
413     m_skipLength -= length;
414   }
415 
416   return !m_skipLength;
417 }
418 
419 // Same as the PCB code, but only one at a time and with the extension protocol.
420 bool
try_request_metadata_pieces()421 PeerConnectionMetadata::try_request_metadata_pieces() {
422   if (m_download->file_list()->chunk_size() == 1 || !m_extensions->is_remote_supported(ProtocolExtension::UT_METADATA))
423     return false;
424 
425   if (request_list()->queued_empty())
426     m_downStall = 0;
427 
428   uint32_t pipeSize = request_list()->calculate_pipe_size(m_peerChunks.download_throttle()->rate()->rate());
429 
430   // Don't start requesting if we can't do it in large enough chunks.
431   if (request_list()->pipe_size() >= (pipeSize + 10) / 2)
432     return false;
433 
434   // DEBUG:
435 //   if (!request_list()->queued_size() < pipeSize || !m_up->can_write_extension() ||
436   if (!m_up->can_write_extension() || m_extensions->has_pending_message())
437     return false;
438 
439   const Piece* p = request_list()->delegate();
440 
441   if (p == NULL)
442     return false;
443 
444   if (!m_download->file_list()->is_valid_piece(*p) || !m_peerChunks.bitfield()->get(p->index()))
445     throw internal_error("PeerConnectionMetadata::try_request_metadata_pieces() tried to use an invalid piece.");
446 
447   // DEBUG:
448   if (m_extensions->request_metadata_piece(p)) {
449     LT_LOG_METADATA_EVENTS("request metadata piece succeded", 0);
450     return true;
451   } else {
452     LT_LOG_METADATA_EVENTS("request metadata piece failed", 0);
453     return false;
454   }
455 }
456 
457 void
receive_metadata_piece(uint32_t piece,const char * data,uint32_t length)458 PeerConnectionMetadata::receive_metadata_piece(uint32_t piece, const char* data, uint32_t length) {
459   if (data == NULL) {
460     // Length is not set in a reject message.
461     length = ProtocolExtension::metadata_piece_size;
462 
463     if ((piece << ProtocolExtension::metadata_piece_shift) + ProtocolExtension::metadata_piece_size >= m_download->file_list()->size_bytes())
464       length = m_download->file_list()->chunk_size() % ProtocolExtension::metadata_piece_size;
465 
466     m_tryRequest = false;
467     read_cancel_piece(Piece(0, piece << ProtocolExtension::metadata_piece_shift, length));
468 
469     LT_LOG_METADATA_EVENTS("rejected metadata piece", 0);
470     return;
471   }
472 
473   if (!down_chunk_start(Piece(0, piece << ProtocolExtension::metadata_piece_shift, length))) {
474     LT_LOG_METADATA_EVENTS("skipped metadata piece", 0);
475     down_chunk_skip_process(data, length);
476   } else {
477     LT_LOG_METADATA_EVENTS("processed metadata piece", 0);
478     down_chunk_process(data, length);
479   }
480 
481   if (m_request_list.transfer() != NULL && !m_request_list.transfer()->is_finished())
482     throw internal_error("PeerConnectionMetadata::receive_metadata_piece did not have complete piece.");
483 
484   m_tryRequest = true;
485   down_chunk_finished();
486 }
487 
488 }
489