1 /*
2 * sendrecv.cpp
3 * most of the swift's state machine
4 *
5 * Created by Victor Grishchenko on 3/6/09.
6 * Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7 *
8 */
9 // Arno, 2013-06-11: Must come first to ensure SIZE_MAX etc are defined
10 #include "compat.h"
11 #include "bin_utils.h"
12 #include "swift.h"
13 #include <algorithm> // kill it
14 #include <cassert>
15 #include <cfloat>
16 #include <sstream>
17
18 using namespace swift;
19 using namespace std;
20
21 struct event_base *Channel::evbase;
22 struct event Channel::evrecv;
23
24 #define DEBUGTRAFFIC 1
25
26 /** Arno: Victor's design allows a sender to choose some data to push to
27 * a receiver, if that receiver is not HINTing at data. Should be disabled
28 * when the receiver has a download rate limit.
29 */
30 #define ENABLE_SENDERSIZE_PUSH 0
31
32
33 #define ENABLE_CANCEL 0
34
35
36 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
37 * we send HINTs for 2 chunks at the moment. This constant can be used to
38 * get greater granularity. Set to 0 for original behavior.
39 * Ric: set to 2, it should be smaller than 4 or we need to change the hint
40 * strategy first
41 * Ric: 2013-05: We need to always send out at least one req. for each channel.
42 * Otherwise, if the link between two peers is heavily congested or has a high
43 * pkt loss rate, ledbat will close the connection. (short explanation)
44 */
45 #define HINT_GRANULARITY 1 // chunks
46
47
48
AddRequiredHashes(struct evbuffer * evb,bin_t pos,bool isretransmit)49 void Channel::AddRequiredHashes(struct evbuffer *evb, bin_t pos, bool isretransmit)
50 {
51 // We're either seeder or know what content integrity protection method
52 // is because seeder told us, so hs_out_ is guiding here so we can
53 // send peak hashes in first datagram.
54 // TODO: adjust DefaultHandshake from which hs_out_ is derived when
55 // an authoritative source proves the CIPM is UNIFIED_MERKLE.
56 //
57 if (transfer()->ttype() == FILE_TRANSFER) {
58 // Arno, 2013-02-25: Need to send peak bins always (also CIPM None)
59 // to cold clients to communicate tree size
60 if (ack_in_.is_empty() && hashtree() != NULL && hashtree()->peak_count() > 0) {
61 AddUnsignedPeakHashes(evb);
62 }
63
64 if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE) {
65 if (pos != bin_t::NONE)
66 AddFileUncleHashes(evb,pos);
67 }
68 } else {
69 // LIVE
70 if (PeerIsSource())
71 return;
72
73 // See if there is a first, or new signed munro to send
74 bin_t munro = bin_t::NONE;
75 if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
76 // No content integrity protection, so just report current pos as
77 // source pos == munro
78 LivePiecePicker *lpp = (LivePiecePicker *)transfer()->picker();
79 if (lpp == NULL) {
80 // I am source
81 LiveTransfer *lt = (LiveTransfer *)transfer();
82 munro = lt->GetSourceCurrentPos();
83 } else
84 munro = lpp->GetCurrentPos();
85 } else { //POPT_CONT_INT_PROT_UNIFIED_MERKLE
86 LiveHashTree *umt = (LiveHashTree *)hashtree();
87 if (pos == bin_t::NONE) {
88 // Initially send last signed munro
89 munro = umt->GetLastMunro();
90 } else {
91 // After, send munro required for pos
92 munro = umt->GetMunro(pos);
93 if (munro == bin_t::NONE)
94 return;
95 }
96 }
97
98 if (pos == bin_t::NONE) {
99 // Initially send last signed munro
100 dprintf("%s #%" PRIu32 " last munro %s\n",tintstr(),id_,munro.str().c_str());
101 bool ahead=false;
102 if (ack_in_right_basebin_ != bin_t::NONE) {
103 if (ack_in_right_basebin_ > munro.base_right())
104 ahead = true;
105 }
106 // Don't send when peer has chunks in range, or when it's downloading from us (e.g. chunks earlier than munro)
107 if (munro != bin_t::NONE && ack_in_.is_empty(munro) && !munro_ack_rcvd_ && !ahead) {
108 AddLiveSignedMunroHash(evb,munro);
109 last_sent_munro_ = munro;
110 }
111 } else {
112 // After, send munro required for pos
113 // Don't repeat if same and not retransmit
114 bool diff = (munro != last_sent_munro_);
115 last_sent_munro_ = munro;
116
117 dprintf("%s #%" PRIu32 " munro for %s is %s\n",tintstr(),id_,pos.str().c_str(), munro.str().c_str());
118
119 if (isretransmit || diff)
120 AddLiveSignedMunroHash(evb,munro);
121
122 if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
123 AddLiveUncleHashes(evb,pos,munro,isretransmit);
124 }
125 }
126 }
127
128
AddUnsignedPeakHashes(struct evbuffer * evb)129 void Channel::AddUnsignedPeakHashes(struct evbuffer *evb)
130 {
131 for (int i=0; i<hashtree()->peak_count(); i++) {
132 bin_t peak = hashtree()->peak(i);
133 evbuffer_add_8(evb, SWIFT_INTEGRITY);
134 evbuffer_add_chunkaddr(evb,peak,hs_out_->chunk_addr_);
135 evbuffer_add_hash(evb, hashtree()->peak_hash(i));
136 dprintf("%s #%" PRIu32 " +phash %s\n",tintstr(),id_,peak.str().c_str());
137 }
138 }
139
140
141 // SIGNPEAK
AddLiveSignedMunroHash(struct evbuffer * evb,bin_t munro)142 void Channel::AddLiveSignedMunroHash(struct evbuffer *evb, bin_t munro)
143 {
144 BinHashSigTuple bhst = BinHashSigTuple::NOBULL;
145 if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
146 // No content integrity protection, create dummy signature tuple to send
147 uint8_t dummysigdata[SWIFT_CIPM_NONE_SIGLEN];
148 memset(dummysigdata,0,SWIFT_CIPM_NONE_SIGLEN);
149 Signature sig(dummysigdata, SWIFT_CIPM_NONE_SIGLEN);
150 SigTintTuple sigtint(sig, NOW);
151 bhst = BinHashSigTuple(munro,Sha1Hash::ZERO,sigtint);
152 } else {
153 // Send signed munro hash
154 LiveHashTree *umt = (LiveHashTree *)hashtree();
155 bhst = umt->GetSignedMunro(munro);
156 }
157 if (bhst.bin() == bin_t::NONE) {
158 dprintf("%s #%" PRIu32 " !mhash %s\n",tintstr(),id_,munro.str().c_str());
159 return;
160 }
161
162 if (hs_out_->cont_int_prot_ != POPT_CONT_INT_PROT_NONE) {
163 evbuffer_add_8(evb, SWIFT_INTEGRITY);
164 evbuffer_add_chunkaddr(evb,bhst.bin(),hs_out_->chunk_addr_);
165 evbuffer_add_hash(evb, bhst.hash());
166 }
167
168 dprintf("%s #%" PRIu32 " +mhash %s\n",tintstr(),id_,bhst.bin().str().c_str());
169
170 //fprintf(stderr,"AddLiveSignedMunroHash: speak %s %s\n", bhst.bin().str().c_str(), bhst.hash().hex().c_str() );
171
172 evbuffer_add_8(evb, SWIFT_SIGNED_INTEGRITY);
173 evbuffer_add_chunkaddr(evb,bhst.bin(),hs_out_->chunk_addr_);
174 evbuffer_add_64be(evb, bhst.sigtint().time());
175 evbuffer_add(evb, bhst.sigtint().sig().bits(), bhst.sigtint().sig().length());
176
177 dprintf("%s #%" PRIu32 " +sigh %s %d\n",tintstr(),id_,bhst.bin().str().c_str(), bhst.sigtint().sig().length());
178 }
179
180
181
AddFileUncleHashes(struct evbuffer * evb,bin_t pos)182 void Channel::AddFileUncleHashes(struct evbuffer *evb, bin_t pos)
183 {
184 bin_t peak = hashtree()->peak_for(pos);
185 binvector bv;
186 while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
187 ack_in_.is_empty(pos.parent())) {
188 // Ric: TODO optimise.. send based on pkt loss statistics
189 // the above is correct but should not happen at the beginning!
190 //while (pos!=peak && ack_in_.is_empty(pos.parent()) ) {
191 bin_t uncle = pos.sibling();
192 bv.push_back(uncle);
193 pos = pos.parent();
194 }
195
196 // PPSP -04: Send in descending layer order
197 binvector::reverse_iterator iter;
198 for (iter=bv.rbegin(); iter != bv.rend(); iter++) {
199 bin_t uncle = *iter;
200 evbuffer_add_8(evb, SWIFT_INTEGRITY);
201 evbuffer_add_chunkaddr(evb,uncle,hs_out_->chunk_addr_);
202 evbuffer_add_hash(evb, hashtree()->hash(uncle));
203 dprintf("%s #%" PRIu32 " +hash %s\n",tintstr(),id_,uncle.str().c_str());
204 }
205
206 }
207
208 //SIGNPEAK
AddLiveUncleHashes(struct evbuffer * evb,bin_t pos,bin_t munro,bool isretransmit)209 void Channel::AddLiveUncleHashes(struct evbuffer *evb, bin_t pos, bin_t munro, bool isretransmit)
210 {
211 binvector bv;
212 if (isretransmit) {
213 // Select all uncles
214 while (pos!=munro) {
215 bin_t uncle = pos.sibling();
216 bv.push_back(uncle);
217 pos = pos.parent();
218 }
219 } else {
220 // Select only unsent uncles
221 // Ric: TODO check (remove data_out_cap??)
222 // For the moment lets keep the old behaviour
223 while (pos!=munro && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
224 ack_in_.is_empty(pos.parent())) {
225 //while (pos!=munro && ack_in_.is_empty(pos.parent()) ) {
226 bin_t uncle = pos.sibling();
227 bv.push_back(uncle);
228 pos = pos.parent();
229 }
230 }
231
232 // PPSP -04: Send in descending layer order
233 binvector::reverse_iterator iter;
234 for (iter=bv.rbegin(); iter != bv.rend(); iter++) {
235 bin_t uncle = *iter;
236 evbuffer_add_8(evb, SWIFT_INTEGRITY);
237 evbuffer_add_chunkaddr(evb,uncle,hs_out_->chunk_addr_);
238 Sha1Hash h = hashtree()->hash(uncle);
239 if (h == Sha1Hash::ZERO) {
240 // TEMP SIGNPEAKTODO
241 fprintf(stderr,"SENDING ZERO HASH %s. PRESS\n", uncle.str().c_str());
242 fflush(stderr);
243 }
244 evbuffer_add_hash(evb,h);
245 dprintf("%s #%" PRIu32 " +hash %s\n",tintstr(),id_,uncle.str().c_str());
246 pos = pos.parent();
247 }
248 }
249
250
251
252
253
ImposeHint()254 bin_t Channel::ImposeHint()
255 {
256 uint64_t twist = hs_in_->peer_channel_id_; // got no hints, send something randomly
257
258 twist &= hashtree()->peak(0).toUInt(); // FIXME may make it semi-seq here
259
260 bin_t my_pick = binmap_t::find_complement(ack_in_, *(transfer()->ack_out()), twist);
261 my_pick.to_twisted(twist);
262 while (my_pick.base_length()>max(1,(int)cwnd_))
263 my_pick = my_pick.left();
264
265 return my_pick.twisted(twist);
266 }
267
268
DequeueHint(bool * retransmitptr)269 bin_t Channel::DequeueHint(bool *retransmitptr)
270 {
271 bin_t send = bin_t::NONE;
272
273 // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
274 if (last_recv_time_ < NOW-(3*TINT_SEC)) {
275 dprintf("%s #%" PRIu32 " dequeue hint aborted, long time no recv %s\n",tintstr(),id_, tintstr(last_recv_time_));
276 return bin_t::NONE;
277 }
278
279 // Arno, 2012-07-27: Reenable Victor's retransmit, check for ACKs
280 *retransmitptr = false;
281 while (!data_out_tmo_.empty()) {
282 tintbin tb = data_out_tmo_.front();
283 data_out_tmo_.pop_front();
284 if (ack_in_.is_filled(tb.bin)) {
285 // chunk was acknowledged in meantime
286 continue;
287 } else {
288 send = tb.bin;
289 dprintf("%s #%" PRIu32 " dequeuing timed-out %s\n",tintstr(),id_, send.str().c_str());
290 *retransmitptr = true;
291 if (!send.is_base())
292 dprintf("%s #%" PRIu32 " Error: retransmit, %s, if not a base bin!\n",tintstr(),id_, send.str().c_str());
293 break;
294 }
295 }
296
297 if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
298 bin_t my_pick = ImposeHint(); // FIXME move to the loop
299 if (!my_pick.is_none()) {
300 hint_in_size_ += my_pick.base_offset();
301 hint_in_.push_back(my_pick);
302 dprintf("%s #%" PRIu32 " *hint %s\n",tintstr(),id_,my_pick.str().c_str());
303 }
304 }
305
306 dprintf("%s #%" PRIu32 " dequeue size: %d\n",tintstr(),id_, (int)hint_in_size_ );
307
308 while (!hint_in_.empty() && send.is_none()) {
309 bin_t hint = hint_in_.front().bin;
310 dprintf("%s #%" PRIu32 " dequeuing cand %s\n",tintstr(),id_, hint.str().c_str());
311
312 tint time = hint_in_.front().time;
313 hint_in_size_ -= hint_in_.front().bin.base_length();
314 hint_in_.pop_front();
315
316 if (time < min(NOW-TINT_SEC*3/2, NOW-(rtt_avg_<<2))) {
317 dprintf("%s #%" PRIu32 " Don't serve: hint %s is too old\n",tintstr(),id_, send.str().c_str());
318 continue;
319 }
320
321 while (!hint.is_base()) { // FIXME optimize; possible attack
322 hint_in_.push_front(tintbin(time,hint.right()));
323 hint_in_size_ += hint_in_.front().bin.base_length();
324 hint = hint.left();
325 }
326
327 if (!ack_in_.is_filled(hint))
328 send = hint;
329 else
330 dprintf("%s #%" PRIu32 " hint %s has already been acknowledged\n",tintstr(),id_,hint.str().c_str());
331 }
332
333 dprintf("%s #%" PRIu32 " dequeued %s [%" PRIu64 "]\n",tintstr(),id_,send.str().c_str(),hint_in_size_);
334 return send;
335 }
336
337
AddHandshake(struct evbuffer * evb)338 void Channel::AddHandshake(struct evbuffer *evb)
339 {
340 // If peer not responding, try legacy swift protocol
341 #if ENABLE_FALLBACK_TO_LEGACY_PROTO == 1
342 if (sent_since_recv_ >= 3 && last_recv_time_ == 0)
343 hs_out_->ResetToLegacy();
344 #endif
345
346 int encoded = -1;
347 if (hs_out_->version_ == VER_SWIFT_LEGACY) {
348 //dprintf("%s #%" PRIu32 " +hs swift legacy\n",tintstr(),id_ );
349 if (hs_in_ == NULL) { // initiating
350 evbuffer_add_8(evb, SWIFT_INTEGRITY);
351 evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
352 evbuffer_add_hash(evb, transfer()->swarm_id().roothash());
353 dprintf("%s #%" PRIu32 " +hash ALL %s\n",
354 tintstr(),id_,transfer()->swarm_id().hex().c_str());
355 }
356 evbuffer_add_8(evb, SWIFT_HANDSHAKE);
357
358 if (send_control_==CLOSE_CONTROL) {
359 encoded = 0;
360 } else
361 encoded = EncodeID(id_);
362 evbuffer_add_32be(evb, encoded);
363
364 dprintf("%s #%" PRIu32 " +hs %x swift\n",tintstr(),id_,encoded);
365 } else { // IETF PPSP compliant
366 //dprintf("%s #%" PRIu32 " +hs ppsp\n",tintstr(),id_ );
367 evbuffer_add_8(evb, SWIFT_HANDSHAKE);
368 if (send_control_==CLOSE_CONTROL) {
369 encoded = 0;
370 } else
371 encoded = EncodeID(id_);
372 evbuffer_add_32be(evb, encoded);
373
374 // Send protocol options
375 std::ostringstream cross;
376 if (send_control_ !=CLOSE_CONTROL) {
377 evbuffer_add_8(evb, POPT_VERSION);
378 evbuffer_add_8(evb, hs_out_->version_);
379 cross << "v" << hs_out_->version_ << " ";
380 evbuffer_add_8(evb, POPT_MIN_VERSION);
381 evbuffer_add_8(evb, hs_out_->min_version_);
382 cross << "nv" << hs_out_->version_ << " ";
383
384 if (hs_in_ == NULL) { // initiating, send swarm ID
385 evbuffer_add_8(evb, POPT_SWARMID);
386 if (transfer()->ttype() == FILE_TRANSFER) {
387 evbuffer_add_16be(evb, Sha1Hash::SIZE);
388 evbuffer_add_hash(evb, transfer()->swarm_id().roothash());
389 } else {
390 SwarmPubKey spubkey = transfer()->swarm_id().spubkey();
391 evbuffer_add_16be(evb, spubkey.length());
392 evbuffer_add(evb,spubkey.bits(),spubkey.length());
393 }
394 cross << "sid " << transfer()->swarm_id().hex() << " ";
395 }
396 evbuffer_add_8(evb, POPT_CONT_INT_PROT);
397 evbuffer_add_8(evb, hs_out_->cont_int_prot_);
398 cross << "cipm " << hs_out_->cont_int_prot_ << " ";
399 if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE) {
400 evbuffer_add_8(evb, POPT_MERKLE_HASH_FUNC);
401 evbuffer_add_8(evb, hs_out_->merkle_func_);
402 cross << "mhf " << hs_out_->merkle_func_ << " ";
403 }
404 if (transfer()->ttype() == LIVE_TRANSFER && hs_out_->cont_int_prot_ != POPT_CONT_INT_PROT_NONE) {
405 evbuffer_add_8(evb, POPT_LIVE_SIG_ALG);
406 evbuffer_add_8(evb, hs_out_->live_sig_alg_);
407 cross << "lsa " << hs_out_->live_sig_alg_ << " ";
408 }
409 evbuffer_add_8(evb, POPT_CHUNK_ADDR);
410 evbuffer_add_8(evb, hs_out_->chunk_addr_);
411 cross << "cam " << hs_out_->chunk_addr_ << " ";
412 if (transfer()->ttype() == LIVE_TRANSFER) {
413 evbuffer_add_8(evb, POPT_LIVE_DISC_WND);
414 // For POPT_CHUNK_ADDR_CHUNK32, saves all chunks
415 // PPSPTODO forget
416 if (hs_out_->chunk_addr_ == POPT_CHUNK_ADDR_BIN32 || hs_out_->chunk_addr_ == POPT_CHUNK_ADDR_CHUNK32)
417 evbuffer_add_32be(evb, (uint32_t)hs_out_->live_disc_wnd_);
418 else
419 evbuffer_add_64be(evb, hs_out_->live_disc_wnd_);
420 cross << "ldw " << std::hex << hs_out_->live_disc_wnd_ << std::dec << " ";
421 }
422 }
423 dprintf("%s #%" PRIu32 " +hs %x ppsp %s\n",tintstr(),id_,encoded, cross.str().c_str());
424
425 evbuffer_add_8(evb, POPT_END);
426 }
427
428 have_out_.clear();
429 }
430
431
Send()432 void Channel::Send()
433 {
434
435 dprintf("%s #%" PRIu32 " Send called \n",tintstr(),id_);
436
437 // Ledbat log
438 // Time - PingPong - SlowStart - CC - KeepAlive - Close - CCwindow - Loss
439 if (id_==1)
440 switch (send_control_) {
441 case KEEP_ALIVE_CONTROL:
442 lprintf("%lu \t %d \t %d \t %d \t %li \t %d \t %d \t %d \t %li \t %li\n", NOW-open_time_, 0, 0, 0, NOW-last_send_time_,
443 0, 0, 0, dip_avg_, hint_out_size_);
444 break;
445 case PING_PONG_CONTROL:
446 lprintf("%lu \t %li \t %d \t %d \t %d \t %d \t %d \t %d \n", NOW-open_time_, NOW-last_send_time_, 0, 0, 0, 0, 0, 0);
447 break;
448 case SLOW_START_CONTROL:
449 lprintf("%lu \t %d \t %li \t %d \t %d \t %d \t %d \t %d \n", NOW-open_time_, 0, NOW-last_send_time_, 0, 0, 0, 0, 0);
450 break;
451 case AIMD_CONTROL:
452 lprintf("%lu \t %d \t %d \t %li \t %d \t %d \t %d \t %.2f \n", NOW-open_time_, 0, 0, NOW-last_send_time_, 0, 0, 0,
453 cwnd_);
454 break;
455 case LEDBAT_CONTROL:
456 lprintf("%lu \t %d \t %d \t %li \t %d \t %d \t %d \t %.2f \t %li\n", NOW-open_time_, 0, 0, NOW-last_send_time_, 0, 0, 0,
457 cwnd_, hint_in_size_);
458 break;
459 case CLOSE_CONTROL:
460 lprintf("%lu \t %d \t %d \t %d \t %d \t %li \t %d \t %d \n", NOW-open_time_, 0, 0, 0, 0, NOW-last_send_time_, 0, 0);
461 break;
462 default:
463 assert(false);
464 break;
465 }
466
467 struct evbuffer *evb = evbuffer_new();
468 uint32_t pcid = 0;
469 if (hs_in_ != NULL)
470 pcid = hs_in_->peer_channel_id_;
471
472 evbuffer_add_32be(evb,pcid);
473 bin_t data = bin_t::NONE;
474 int evbnonadplen = 0;
475 if (send_control_==CLOSE_CONTROL) // Arno: send explicit close
476 AddHandshake(evb);
477 else {
478 if (is_established()) {
479 // FIXME: seeder check
480 AddHave(evb);
481 AddAck(evb);
482 //LIVE
483 if (hashtree() == NULL || !hashtree()->is_complete()) {
484 AddHint(evb);
485 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
486 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
487 */
488 AddPexReq(evb);
489 if (ENABLE_CANCEL)
490 AddCancel(evb);
491 }
492 AddPex(evb);
493 TimeoutDataOut();
494 data = AddData(evb);
495 } else {
496 AddHandshake(evb);
497 AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
498 AddHave(evb);
499 AddAck(evb);
500 }
501 }
502 lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
503
504 if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
505 data = bin_t::ALL;
506 }
507
508 dprintf("%s #%" PRIu32 " sent %ib %s:%x\n",
509 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str().c_str(),
510 pcid);
511 last_send_time_ = NOW;
512
513 int r = SendTo(socket_,peer(),evb);
514 if (r==-1)
515 print_error("swift can't send datagram");
516 else {
517 raw_bytes_up_ += r;
518 sent_since_recv_++;
519 dgrams_sent_++;
520 }
521 evbuffer_free(evb);
522 Reschedule();
523 }
524
AddHint(struct evbuffer * evb)525 void Channel::AddHint(struct evbuffer *evb)
526 {
527
528 // LIVE source
529 if (transfer()->picker() == NULL)
530 return;
531
532 // RATELIMIT
533 // Policy is to not send hints when we are above speed limit
534 if (transfer()->GetCurrentSpeed(DDIR_DOWNLOAD) > transfer()->GetMaxSpeed(DDIR_DOWNLOAD)) {
535 if (DEBUGTRAFFIC)
536 fprintf(stderr,"hint: forbidden#");
537 return;
538 }
539
540 FileTransfer * ft = (FileTransfer *)transfer();
541
542 if (ft->hashtree()->is_complete())
543 return;
544
545 // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
546 tint plan_for = max(TINT_SEC*HINT_TIME,rtt_avg_<<2);
547 tint timed_out = NOW - plan_for*2;
548
549 std::deque<bin_t> tbc;
550 while (!hint_out_.empty() && hint_out_.front().time < timed_out) {
551 bin_t hint = hint_out_.front().bin;
552 hint_out_size_ -= hint.base_length();
553 hint_out_.pop_front();
554 // Ric: keep track of what we want to remove
555 tbc.push_back(hint);
556 dprintf("%s #%" PRIu32 " remove hint %s\n",tintstr(),id_,hint.str().c_str());
557
558 }
559
560 // Ric: calculate the dip over the last period
561 // use same approach as LEDBAT!
562 // We may apply a filter over the elements.. as suggested in the rfc
563 ttqueue::iterator it = dip_list_.begin();
564 int32_t count = 0;
565 int total = 0;
566 tint dip = 0;
567 tint timeout = NOW - plan_for/2;
568 // use the dip received during the last period, or the dip_avg_
569 while (it != dip_list_.end() && (it->second > timeout || count < 4)) {
570 total += it->first;
571 count++;
572 it++;
573 }
574 // clean up
575 while (dip_list_.size() > 4 && dip_list_.back().second < timeout) {
576 dip_list_.pop_back();
577 }
578 if (dip_list_.size() < 4)
579 dip = dip_avg_;
580 else
581 dip = total/count;
582
583 int first_plan_pck = (tint)1;
584 if (dip != 0)
585 first_plan_pck = max((tint)1, plan_for / dip);
586
587 // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
588 int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
589
590
591 // RATELIMIT
592 // 2. Calc max of what is allowed by the rate limiter
593 int rate_allowed_hints = INT_MAX;
594 uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
595 bool count_hints = false;
596 if (transfer()->GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX) {
597 channels_t::iterator iter;
598 for (iter=transfer()->GetChannels()->begin(); iter!=transfer()->GetChannels()->end(); iter++) {
599 Channel *c = *iter;
600 if (c != NULL)
601 rough_global_hint_out_size += c->hint_out_size_;
602 }
603
604 // Policy: this channel is allowed to hint at the limit - global_hinted_at
605 // Handle MaxSpeed = unlimited
606 double rate_hints_limit_float = HINT_TIME*transfer()->GetMaxSpeed(DDIR_DOWNLOAD)/((double)transfer()->chunk_size());
607
608 // Ric: slow down if we just started the connection
609 double slowStart = (double)LONG_MAX;
610 tint running = now_t::now-start;
611 // It takes ~3 sec to get a stable DL speed estimation
612 if (running<TINT_SEC*3) {// && hint_out_size_>1) {
613 count_hints = true;
614 slowStart = rate_hints_limit_float*running/TINT_SEC;
615 if (slowStart>transfer()->GetSlowStartHints())
616 slowStart = slowStart-transfer()->GetSlowStartHints();
617 else
618 slowStart = 0;
619 if (DEBUGTRAFFIC)
620 fprintf(stderr, "slowStart: %lf [%" PRIu32 "]\n", slowStart, transfer()->GetSlowStartHints());
621 }
622
623 int rate_hints_limit = (int)min(slowStart,rate_hints_limit_float);
624
625 // Actually allowed is max minus what we already asked for, globally (=all channels)
626 rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
627
628 if (DEBUGTRAFFIC)
629 fprintf(stderr,"hint c%" PRIu32 ": rate_hints_limit %d rallow %d globout %" PRIu64 "\n", id(), rate_hints_limit,
630 rate_allowed_hints, rough_global_hint_out_size);
631
632 }
633 // Ric: test TODO
634 //if (DEBUGTRAFFIC)
635 //fprintf(stderr,"hint c%" PRIu32 ": %lf want %d qallow %d rallow %d chanout %" PRIu64 " globout %" PRIu64 "\n", id(),
636 // transfer()->GetCurrentSpeed(DDIR_DOWNLOAD), first_plan_pck, queue_allowed_hints, rate_allowed_hints, hint_out_size_,
637 // rough_global_hint_out_size);
638 dprintf("%s #%" PRIu32 "hint c%" PRIu32 ": %lf want %d qallow %d rallow %d chanout %" PRIu64 " globout %" PRIu64 "\n",tintstr(),id_, id(),
639 transfer()->GetCurrentSpeed(DDIR_DOWNLOAD), first_plan_pck, queue_allowed_hints, rate_allowed_hints, hint_out_size_,
640 rough_global_hint_out_size);
641 // 3. Take the smallest allowance from rate and queue limit
642 // Ric: test: TODO remove
643 //uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,first_plan_pck);
644 uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
645
646
647 // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
648 // Arno, 2012-10-30: not HINT_GRANULARITY for LIVE
649 //if (hint_out_size_ == 0 || plan_pck >= HINT_GRANULARITY || transfer()->ttype() == LIVE_TRANSFER)
650 if (plan_pck >= HINT_GRANULARITY || transfer()->ttype() == LIVE_TRANSFER) {
651 bin_t hint = bin_t::NONE;
652 if (transfer()->ttype() == LIVE_TRANSFER)
653 hint = transfer()->picker()->Pick(ack_in_,plan_pck,NOW+plan_for*2,id_);
654 else {
655 //fprintf(stderr, "want %d\tplan %d\n", want, plan_pck);
656 hint = DequeueHintOut(plan_pck);
657
658 if (hint.is_none()) {
659 bin_t res = transfer()->picker()->Pick(ack_in_,plan_pck,NOW+plan_for*2,id_);
660 if (!res.is_none()) {
661 hint_queue_out_.push_back(tintbin(NOW,res));
662 hint_queue_out_size_ += res.base_length();
663 hint = DequeueHintOut(plan_pck);
664 }
665 }
666 }
667
668 if (!hint.is_none()) {
669 if (DEBUGTRAFFIC) {
670 fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str().c_str());
671 }
672 evbuffer_add_8(evb, SWIFT_REQUEST);
673 evbuffer_add_chunkaddr(evb,hint,hs_out_->chunk_addr_);
674 dprintf("%s #%" PRIu32 " +hint %s [%" PRIi64 "]\n",tintstr(),id_,hint.str().c_str(),hint_out_size_);
675 dprintf("%s #%" PRIu32 " +hint base %s width %d\n",tintstr(),id_,hint.base_left().str().c_str(),
676 (int)hint.base_length());
677 //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
678 //fprintf(stderr,"HL %i ", hint.base_length());
679
680 #if ENABLE_CANCEL == 1
681 // Ric: final cancel the hints that have been removed
682 while (!tbc.empty()) {
683 bin_t c = tbc.front();
684 if (!c.contains(hint) && !hint.contains(c))
685 cancel_out_.push_back(c);
686 else if (c.contains(hint))
687 while (c.contains(hint) && c!=hint) {
688 if (c>hint)
689 c.to_left();
690 else
691 c.to_right();
692 cancel_out_.push_back(c.sibling());
693 }
694 else if (c == hint)
695 break;
696 tbc.pop_front();
697 }
698 #endif
699 hint_out_.push_back(hint);
700 hint_out_size_ += hint.base_length();
701
702 // Ric: keep track of the outstanding hints
703 if (count_hints)
704 transfer()->SetSlowStartHints(hint.base_length());
705
706 // RTTFIX
707 if (rtt_hint_tintbin_.bin == bin_t::NONE) {
708 rtt_hint_tintbin_.bin = hint.base_left();
709 rtt_hint_tintbin_.time = NOW;
710 }
711
712 return;
713 } else
714 dprintf("%s #%" PRIu32 " Xhint\n",tintstr(),id_);
715 }
716 #if ENABLE_CANCEL == 1
717 // add the temporary cancel bin to the actual cancel queue
718 while (!tbc.empty()) {
719 cancel_out_.push_back(tbc.front());
720 tbc.pop_front();
721 }
722 #endif
723 }
724
ChunkAddrSize(popt_chunk_addr_t ca)725 static int ChunkAddrSize(popt_chunk_addr_t ca)
726 {
727 switch (ca) {
728 case POPT_CHUNK_ADDR_BIN32:
729 return 4;
730 case POPT_CHUNK_ADDR_BYTE64:
731 return 2*8;
732 case POPT_CHUNK_ADDR_CHUNK32:
733 return 2*4;
734 case POPT_CHUNK_ADDR_BIN64:
735 return 8;
736 case POPT_CHUNK_ADDR_CHUNK64:
737 return 2*8;
738 }
739 return 0;
740 }
741
AddCancel(struct evbuffer * evb)742 void Channel::AddCancel(struct evbuffer *evb)
743 {
744
745 // SIGNPEAKTODO
746 return;
747
748
749 // Arno, 2013-01-15: take into account chunk addressing scheme
750 while (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb) >= 1+ChunkAddrSize(hs_out_->chunk_addr_)
751 && !cancel_out_.empty()) {
752 bin_t cancel = cancel_out_.front();
753 cancel_out_.pop_front();
754 evbuffer_add_8(evb, SWIFT_CANCEL);
755 evbuffer_add_chunkaddr(evb,cancel,hs_out_->chunk_addr_);
756 dprintf("%s #%" PRIu32 " +cancel %s\n",
757 tintstr(),id_,cancel.str().c_str());
758 }
759 }
760
AddData(struct evbuffer * evb)761 bin_t Channel::AddData(struct evbuffer *evb)
762 {
763 // RATELIMIT
764 if (transfer()->GetCurrentSpeed(DDIR_UPLOAD) > transfer()->GetMaxSpeed(DDIR_UPLOAD)) {
765 transfer()->OnSendNoData();
766 return bin_t::NONE;
767 }
768 //LIVE
769 if (transfer()->ttype() == FILE_TRANSFER && !hashtree()->size()) // know nothing
770 return bin_t::NONE;
771
772 // Arno: If we are serving content, keep activated
773 swift::Touch(transfer()->td());
774
775 bin_t tosend = bin_t::NONE;
776 bool isretransmit = false;
777 tint luft = send_interval_>>4; // may wake up a bit earlier
778
779 if ((data_out_size_<cwnd_ || cwnd_>0) && last_data_out_time_+send_interval_-reschedule_delay_<=NOW+luft) {
780 tosend = DequeueHint(&isretransmit);
781 if (tosend.is_none()) {
782 dprintf("%s #%" PRIu32 " sendctrl no idea what data to send\n",tintstr(),id_);
783 if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL) {
784 lprintf("\t\t==== Switch to Keep Alive Control (nothing to send) ==== \n");
785 keepalivereason_ = NOTHING_TO_SEND;
786 SwitchSendControl(KEEP_ALIVE_CONTROL);
787 }
788 }
789 } else
790 dprintf("%s #%" PRIu32 " sendctrl wait cwnd %f data_out %i next %s\n",
791 tintstr(),id_,cwnd_,data_out_size_,tintstr(last_data_out_time_+send_interval_));
792
793 // Add required hashes. Also for initial peaks and munros
794 // Note this is called always, not just when there are requests pending.
795 AddRequiredHashes(evb,tosend,isretransmit);
796
797 if (tosend.is_none()) {// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
798 transfer()->OnSendNoData();
799 return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
800 }
801
802 if (!ack_in_.is_empty()) // TODO: cwnd_>1
803 data_out_cap_ = tosend;
804
805 // Send hashes in separate datagram if first would get too big
806 SendIfTooBig(evb);
807
808 // Add chunk
809 evbuffer_add_8(evb, SWIFT_DATA);
810 evbuffer_add_chunkaddr(evb,tosend,hs_out_->chunk_addr_);
811 // PPSPTODO LEDBAT current system time 64-bit
812 if (hs_in_ != NULL && hs_in_->version_ == VER_PPSPP_v1) {
813 // NOTE: Time updates NOW, so customary behavior where NOW is not
814 // updated during the handling of a message (just at start) is no longer
815 // there. Not sure if this matters.
816 evbuffer_add_64be(evb, Time());
817 }
818
819 struct evbuffer_iovec vec;
820 if (evbuffer_reserve_space(evb, transfer()->chunk_size(), &vec, 1) < 0) {
821 print_error("error on evbuffer_reserve_space");
822 return bin_t::NONE;
823 }
824
825 if (DEBUGTRAFFIC)
826 dprintf("%s #%" PRIu32 " ?data reading swarm %llu\n",tintstr(),id_, tosend.base_offset()*transfer()->chunk_size());
827
828 ssize_t r = transfer()->GetStorage()->Read((char *)vec.iov_base,
829 transfer()->chunk_size(),tosend.base_offset()*transfer()->chunk_size());
830 // TODO: corrupted data, retries, caching
831 if (r <= 0) {
832 print_error("error on reading");
833
834 dprintf("%s #%" PRIu32 " !data %s\n",tintstr(),id_,tosend.str().c_str());
835 vec.iov_len = 0;
836 evbuffer_commit_space(evb, &vec, 1);
837 return bin_t::NONE;
838 }
839 // assert(dgram.space()>=r+4+1);
840 vec.iov_len = r;
841 if (evbuffer_commit_space(evb, &vec, 1) < 0) {
842 print_error("error on evbuffer_commit_space");
843 return bin_t::NONE;
844 }
845
846 last_data_out_time_ = NOW;
847 data_out_.push_back(tosend);
848 data_out_size_++;
849 bytes_up_ += r;
850 global_bytes_up += r;
851
852 dprintf("%s #%" PRIu32 " +data %s\n",tintstr(),id_,tosend.str().c_str());
853
854 // RATELIMIT
855 // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
856 transfer_->OnSendData(transfer()->chunk_size());
857
858 return tosend;
859 }
860
861
SendIfTooBig(struct evbuffer * evb)862 void Channel::SendIfTooBig(struct evbuffer *evb)
863 {
864 // Arno, 2011-11-03: May happen when first data packet is sent to empty
865 // leech, then peak + uncle hashes may be so big that they don't fit in eth
866 // frame with DATA. Send 2 datagrams then, one with peaks so they have
867 // a better chance of arriving. Optimistic violation of atomic datagram
868 // principle.
869 // Arno, 2013-05-14: Don't work if this is first msg, as peer_channel_id
870 // will be unknown. Then just continue adding to the first datagram and
871 // hope for the best.
872 if (is_established() && transfer()->chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE
873 && evbuffer_get_length(evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
874
875 dprintf("%s #%" PRIu32 " fsent %ib %s:%x\n",
876 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str().c_str(),
877 hs_in_->peer_channel_id_);
878 int ret = Channel::SendTo(socket_,peer(),evb); // kind of fragmentation
879 if (ret > 0)
880 raw_bytes_up_ += ret;
881 evbuffer_add_32be(evb, hs_in_->peer_channel_id_);
882 }
883 }
884
885
AddAck(struct evbuffer * evb)886 void Channel::AddAck(struct evbuffer *evb)
887 {
888 if (data_in_==tintbin())
889 //if (data_in_.bin==bin64_t::NONE)
890 return;
891 // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
892 evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
893 evbuffer_add_chunkaddr(evb,data_in_.bin,hs_out_->chunk_addr_);
894 // PPSPTODO LEDBAT one-way delay
895 if (data_in_.time!=TINT_NEVER)
896 evbuffer_add_64be(evb, data_in_.time);
897
898 if (DEBUGTRAFFIC)
899 fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
900
901 have_out_.set(data_in_.bin);
902 dprintf("%s #%" PRIu32 " +ack %s %" PRIi64 "\n",
903 tintstr(),id_,data_in_.bin.str().c_str(),data_in_.time);
904 if (data_in_.bin.layer()>2)
905 data_in_dbl_ = data_in_.bin;
906
907 #if ENABLE_CANCEL == 1
908 // Ric: check that we are not sending a cancel msg for data_in_
909 std::deque<bin_t>::iterator it;
910 bin_t b = data_in_.bin;
911 for (it=cancel_out_.begin(); it!=cancel_out_.end(); it++) {
912 bin_t c = *it;
913 if (c == b) {
914 cancel_out_.erase(it);
915 break;
916 }
917 // b is always a single chunk :-)
918 else if (c.contains(b)) {
919 while (c.contains(b) && c!=b) {
920 if (c>b) {
921 cancel_out_.insert(it+1,c.right());
922 c.to_left();
923 } else {
924 cancel_out_.insert(it+1,c.left());
925 c.to_right();
926 }
927 }
928 assert(c==b);
929 cancel_out_.erase(it);
930 }
931 }
932 #endif
933 data_in_ = tintbin();
934 //data_in_ = tintbin(NOW,bin64_t::NONE);
935 }
936
937
AddHave(struct evbuffer * evb)938 void Channel::AddHave(struct evbuffer *evb)
939 {
940 if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
941 evbuffer_add_8(evb, SWIFT_HAVE);
942 evbuffer_add_chunkaddr(evb,data_in_dbl_,hs_out_->chunk_addr_);
943 data_in_dbl_=bin_t::NONE;
944 }
945 if (DEBUGTRAFFIC)
946 fprintf(stderr,"send c%d: HAVE ",id());
947
948 // ZEROSTATE
949 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState()) {
950 if (is_established())
951 return;
952
953 // Say we have peaks
954 for (int i=0; i<hashtree()->peak_count(); i++) {
955 bin_t peak = hashtree()->peak(i);
956 evbuffer_add_8(evb, SWIFT_HAVE);
957 evbuffer_add_chunkaddr(evb,peak,hs_out_->chunk_addr_);
958 dprintf("%s #%" PRIu32 " +have %s\n",tintstr(),id_,peak.str().c_str());
959 }
960 return;
961 }
962
963 // SIGNPEAK
964 binmap_t *transfer_ack_out_ptr = transfer()->ack_out();
965 if (hs_in_ != NULL && hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
966 // Arno, 2013-02-26: LIVE SIGNPEAK Cannot send HAVEs not covered by
967 // signed peak when source. Non-source peers will consequently only
968 // see and receive chunks covered by signed peaks.
969 LiveTransfer *lt = (LiveTransfer *)transfer();
970 if (lt->am_source())
971 transfer_ack_out_ptr = lt->ack_out_signed();
972 }
973 for (int count=0; count<4; count++) {
974 bin_t ack = binmap_t::find_complement(have_out_, *(transfer_ack_out_ptr), 0); // FIXME: do rotating queue
975 if (ack.is_none())
976 break;
977 ack = transfer_ack_out_ptr->cover(ack);
978 have_out_.set(ack);
979 evbuffer_add_8(evb, SWIFT_HAVE);
980 evbuffer_add_chunkaddr(evb,ack,hs_out_->chunk_addr_);
981
982 if (DEBUGTRAFFIC)
983 fprintf(stderr," %i", bin_toUInt32(ack));
984
985 dprintf("%s #%" PRIu32 " +have %s\n",tintstr(),id_,ack.str().c_str());
986 }
987 if (DEBUGTRAFFIC)
988 fprintf(stderr,"\n");
989 }
990
991
Recv(struct evbuffer * evb)992 void Channel::Recv(struct evbuffer *evb)
993 {
994 dprintf("%s #%" PRIu32 " recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
995 dgrams_rcvd_++;
996
997 if (!transfer()->IsOperational()) {
998 dprintf("%s #%" PRIu32 " recvd on broken transfer %d \n",tintstr(),id_, transfer()->td());
999 CloseOnError();
1000 return;
1001 }
1002
1003 lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
1004 if (lastrecvwaskeepalive_)
1005 // Update speed measurements such that they decrease when DL stops
1006 transfer()->OnRecvData(0);
1007
1008 if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
1009 rtt_avg_ = NOW - last_send_time_;
1010 dev_avg_ = rtt_avg_;
1011 dip_avg_ = rtt_avg_;
1012 dprintf("%s #%" PRIu32 " sendctrl rtt init %" PRIi64 "\n",tintstr(),id_,rtt_avg_);
1013 //fprintf(stderr,"%s #%" PRIu32 " sendctrl rtt init %" PRIi64 "\n",tintstr(),id_,rtt_avg_);
1014 }
1015
1016 bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
1017
1018 if (DEBUGTRAFFIC)
1019 fprintf(stderr,"recv c%" PRIu32 ": size " PRISIZET "\n", id(), evbuffer_get_length(evb));
1020
1021 Handshake *hishs = NULL;
1022 if (hs_in_ == NULL) { // first reply from client
1023 if (hs_out_->version_ == VER_SWIFT_LEGACY) // I sent PPSPP_v1 HS, did not respond, now trying legacy
1024 hishs = StaticOnHandshake(peer_,id(),true,VER_SWIFT_LEGACY,evb);
1025 else
1026 hishs = StaticOnHandshake(peer_,id(),false,VER_PPSPP_v1,evb);
1027 if (hishs == NULL)
1028 return;
1029 else
1030 OnHandshake(hishs);
1031 }
1032 while (evbuffer_get_length(evb) && send_control_!=CLOSE_CONTROL) {
1033 uint8_t type = evbuffer_remove_8(evb);
1034
1035 if (DEBUGTRAFFIC)
1036 fprintf(stderr,"GOT %d\n", type);
1037
1038 switch (type) {
1039 case SWIFT_HANDSHAKE: // explicit close
1040 hishs = StaticOnHandshake(peer_,id(),true,hs_in_->version_,evb);
1041 if (hishs == NULL)
1042 return;
1043 OnHandshake(hishs);
1044 break;
1045 case SWIFT_DATA:
1046 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1047 OnDataZeroState(evb);
1048 else
1049 data=OnData(evb);
1050 break;
1051 case SWIFT_HAVE:
1052 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1053 OnHaveZeroState(evb);
1054 else
1055 OnHave(evb);
1056 break;
1057 case SWIFT_ACK:
1058 OnAck(evb);
1059 break;
1060 case SWIFT_INTEGRITY:
1061 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1062 OnHashZeroState(evb);
1063 else
1064 OnHash(evb);
1065 break;
1066 case SWIFT_SIGNED_INTEGRITY: // PPSP
1067 OnSignedHash(evb);
1068 break;
1069 case SWIFT_REQUEST:
1070 OnHint(evb);
1071 break;
1072 case SWIFT_CANCEL: // PPSP
1073 OnCancel(evb);
1074 break;
1075 case SWIFT_PEX_RESv4:
1076 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1077 OnPexAddZeroState(evb,AF_INET);
1078 else
1079 OnPexAdd(evb,AF_INET);
1080 break;
1081 case SWIFT_PEX_RESv6: // PPSP
1082 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1083 OnPexAddZeroState(evb,AF_INET6);
1084 else
1085 OnPexAdd(evb,AF_INET6);
1086 break;
1087 case SWIFT_PEX_REScert: // PPSP
1088 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1089 OnPexAddCertZeroState(evb);
1090 else
1091 OnPexAddCert(evb);
1092 break;
1093 case SWIFT_PEX_REQ:
1094 if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1095 OnPexReqZeroState(evb);
1096 else
1097 OnPexReq();
1098 break;
1099 case SWIFT_CHOKE: // PPSP
1100 OnChoke(evb);
1101 break;
1102 case SWIFT_UNCHOKE: // PPSP
1103 OnUnchoke(evb);
1104 break;
1105 default:
1106 dprintf("%s #%" PRIu32 " ?msg id unknown %i\n",tintstr(),id_,(int)type);
1107 return;
1108 }
1109 }
1110 if (DEBUGTRAFFIC) {
1111 fprintf(stderr,"\n");
1112 }
1113
1114 last_recv_time_ = NOW;
1115 sent_since_recv_ = 0;
1116
1117 // Arno: see if transfer still in working order
1118 transfer()->UpdateOperational();
1119 if (!transfer()->IsOperational()) {
1120 dprintf("%s #%" PRIu32 " recvd broke transfer %d \n",tintstr(),id_, transfer()->td());
1121 CloseOnError();
1122 return;
1123 }
1124
1125 Reschedule();
1126 }
1127
1128
CloseOnError()1129 void Channel::CloseOnError()
1130 {
1131 Close(CLOSE_SEND_IF_ESTABLISHED);
1132 }
1133
1134
1135 /*
1136 * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
1137 * hashes check out should they be stored in the hashtree, otherwise revert.
1138 */
OnHash(struct evbuffer * evb)1139 void Channel::OnHash(struct evbuffer *evb)
1140 {
1141 if (hs_in_->cont_int_prot_ != POPT_CONT_INT_PROT_MERKLE
1142 && hs_in_->cont_int_prot_ != POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
1143 dprintf("%s #%" PRIu32 " ?hash but no integrity prot\n",tintstr(),id_);
1144 return;
1145 }
1146
1147 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1148 if (bv.size() == 0 || bv.size() > 1) {
1149 // chunk spec for hash must be power-of-2 range, so must fit in single bin
1150 dprintf("%s #%" PRIu32 " ?hash bad chunk spec\n",tintstr(),id_);
1151 evbuffer_drain(evb, evbuffer_get_length(evb));
1152 Close(CLOSE_DO_NOT_SEND);
1153 return;
1154 }
1155 bin_t pos = bv.front();
1156 Sha1Hash hash = evbuffer_remove_hash(evb);
1157
1158 dprintf("%s #%" PRIu32 " -hash %s\n",tintstr(),id_,pos.str().c_str());
1159 if (hashtree() != NULL && (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE
1160 || hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)) {
1161 if (transfer()->ttype() == LIVE_TRANSFER) {
1162 // Don't accept hashes from others as source
1163 LiveTransfer *lt = (LiveTransfer *)transfer();
1164 if (lt->am_source())
1165 return;
1166 }
1167 hashtree()->OfferHash(pos,hash);
1168 }
1169
1170 //fprintf(stderr,"HASH %" PRIi64 " hex %s\n",pos.toUInt(), hash.hex().c_str() );
1171 }
1172
1173
CleanHintOut(bin_t pos)1174 void Channel::CleanHintOut(bin_t pos)
1175 {
1176 int hi = 0;
1177 while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
1178 hi++;
1179 if (hi==hint_out_.size())
1180 return; // something not hinted or hinted in far past
1181
1182 // Ric: TODO allow reordering of arriving pkts
1183 while (hi--) { // removing likely snubbed hints
1184 bin_t hint = hint_out_.front().bin;
1185 hint_out_size_ -= hint.base_length();
1186 hint_out_.pop_front();
1187 dprintf("%s #%" PRIu32 " Clean outstanding hint %s\n",tintstr(),id_,hint.str().c_str());
1188 #if ENABLE_CANCEL == 1
1189 // Ric: add to the cancel queue
1190 cancel_out_.push_back(hint);
1191 #endif
1192 }
1193 while (hint_out_.front().bin!=pos) {
1194 tintbin f = hint_out_.front();
1195
1196 assert(f.bin.contains(pos));
1197
1198 if (pos < f.bin) {
1199 f.bin.to_left();
1200 } else {
1201 f.bin.to_right();
1202 }
1203
1204 hint_out_.front().bin = f.bin.sibling();
1205 hint_out_.push_front(f);
1206 }
1207 #if ENABLE_CANCEL == 1
1208 // Ric: add to the cancel queue
1209 cancel_out_.push_back(hint_out_.front().bin);
1210 #endif
1211 hint_out_size_ -= hint_out_.front().bin.base_length();
1212 hint_out_.pop_front();
1213 }
1214
DequeueHintOut(uint64_t size)1215 bin_t Channel::DequeueHintOut(uint64_t size)
1216 {
1217
1218 if (DEBUGTRAFFIC)
1219 fprintf(stderr, "%s #%" PRIu32 " Dequeue hint out size (%" PRIu64 ") [%" PRIu64 " are req] ",tintstr(),id_,
1220 hint_queue_out_size_, size);
1221
1222 // check for hints that have already been downloaded
1223 // in order to optimise the retrieval, some bins might be outdated
1224 while (hint_queue_out_.size() && !transfer()->ack_out()->is_empty(hint_queue_out_.front().bin)) {
1225 if (DEBUGTRAFFIC)
1226 dprintf("%s #%" PRIu32 " candidate hint :%s not empty!\n",tintstr(),id_, hint_queue_out_.front().bin.str().c_str());
1227 tintbin tb = hint_queue_out_.front();
1228 bin_t bin = tb.bin;
1229 tint time = tb.time;
1230 hint_queue_out_.pop_front();
1231 hint_queue_out_size_ -= bin.base_length();
1232
1233 while (!bin.is_base() && !transfer()->ack_out()->is_filled(bin)) {
1234 hint_queue_out_.push_front(tintbin(time, bin.right()));
1235 hint_queue_out_size_ += hint_queue_out_.front().bin.base_length();
1236 bin.to_left();
1237 }
1238
1239 if (transfer()->ack_out()->is_empty(bin)) {
1240 hint_queue_out_.push_front(tintbin(time, bin));
1241 hint_queue_out_size_ += bin.base_length();
1242 }
1243 }
1244
1245 // TODO check... the seconds should depend on previous speed of the peer
1246 while (hint_queue_out_.size() && hint_queue_out_.front().time<NOW-TINT_SEC*HINT_TIME*3/2) { // FIXME sec
1247 hint_queue_out_size_ -= hint_queue_out_.front().bin.base_length();
1248 if (DEBUGTRAFFIC)
1249 dprintf("%s #%" PRIu32 " Removing queued hint:%s\n",tintstr(),id_, hint_queue_out_.front().bin.str().c_str());
1250 hint_queue_out_.pop_front();
1251 }
1252
1253 if (!hint_queue_out_size_) {
1254 if (DEBUGTRAFFIC)
1255 fprintf(stderr, " ..refill\n");
1256 return bin_t::NONE;
1257 }
1258
1259 while (hint_queue_out_.front().bin.base_length()>size) {
1260 tintbin ret = hint_queue_out_.front();
1261 ret.bin.to_left();
1262 hint_queue_out_.front().bin = ret.bin.sibling();
1263 hint_queue_out_.push_front(ret);
1264 }
1265
1266 bin_t res = hint_queue_out_.front().bin;
1267 hint_queue_out_.pop_front();
1268 hint_queue_out_size_ -= res.base_length();
1269 if (DEBUGTRAFFIC)
1270 fprintf(stderr, " sending %s [%llu]\n",res.str().c_str(),res.base_length());
1271 return res;
1272
1273 }
1274
1275
OnData(struct evbuffer * evb)1276 bin_t Channel::OnData(struct evbuffer *evb) // TODO: HAVE NONE for corrupted data
1277 {
1278
1279 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1280 if (bv.size() == 0 || bv.size() > 1) {
1281 // Chunk spec must denote single chunk
1282 dprintf("%s #%" PRIu32 " ?data bad chunk spec\n",tintstr(),id_);
1283 Close(CLOSE_DO_NOT_SEND);
1284 evbuffer_drain(evb, evbuffer_get_length(evb));
1285 return bin_t::NONE;
1286 }
1287 bin_t pos = bv.front();
1288 tint peer_time = TINT_NEVER;
1289 if (hs_out_->version_ == VER_PPSPP_v1)
1290 peer_time = evbuffer_remove_64be(evb);
1291
1292 // Arno: Assuming DATA last message in datagram
1293 if (evbuffer_get_length(evb) > transfer()->chunk_size()) {
1294 dprintf("%s #%" PRIu32 " !data chunk size mismatch %s: exp %" PRIu32 " got " PRISIZET "\n",tintstr(),id_,
1295 pos.str().c_str(), transfer()->chunk_size(), evbuffer_get_length(evb));
1296 fprintf(stderr,"WARNING: chunk size mismatch: exp %" PRIu32 " got " PRISIZET "\n",transfer()->chunk_size(),
1297 evbuffer_get_length(evb));
1298 }
1299
1300 int length = (evbuffer_get_length(evb) < transfer()->chunk_size()) ? evbuffer_get_length(
1301 evb) : transfer()->chunk_size();
1302 if (!transfer()->ack_out()->is_empty(pos)) {
1303 // Arno, 2012-01-24: print message for duplicate
1304 dprintf("%s #%" PRIu32 " Ddata %s\n",tintstr(),id_,pos.str().c_str());
1305 evbuffer_drain(evb, length);
1306 data_in_ = tintbin(TINT_NEVER,transfer()->ack_out()->cover(pos));
1307
1308 // Arno, 2012-01-24: Make sure data interarrival periods don't get
1309 // screwed up because of these (ignored) duplicates.
1310 UpdateDIP(pos);
1311 return bin_t::NONE;
1312 }
1313
1314 uint8_t *data = evbuffer_pullup(evb, length);
1315
1316 //fprintf(stderr,"OnData: Got chunk %d / %" PRIi64 "\n", length, swift::SeqComplete(transfer()->fd()) );
1317
1318 // SIGNPEAK
1319 if (hashtree() != NULL && (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE
1320 || hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)) {
1321 // Check integrity
1322 if (!hashtree()->OfferData(pos, (char*)data, length)) {
1323 evbuffer_drain(evb, length);
1324 dprintf("%s #%" PRIu32 " !data %s\n",tintstr(),id_,pos.str().c_str());
1325 return bin_t::NONE;
1326 }
1327
1328 // Arno: If we are getting content, keep activated
1329 swift::Touch(transfer()->td());
1330 } else {
1331 // No content integrity checking, just write (TODO SIGN_ALL)
1332 int ret = transfer()->GetStorage()->Write(data,length,pos.base_offset()*transfer()->chunk_size());
1333 if (ret < 0)
1334 print_error("storage Write failed");
1335 else
1336 transfer()->ack_out()->set(pos);
1337 }
1338
1339 evbuffer_drain(evb, length);
1340 dprintf("%s #%" PRIu32 " -data %s\n",tintstr(),id_,pos.str().c_str());
1341
1342 if (DEBUGTRAFFIC)
1343 fprintf(stderr,"$ ");
1344
1345 // Do swift API callbacks
1346 bin_t cover = transfer()->ack_out()->cover(pos);
1347 transfer()->Progress(cover);
1348 //if (cover.layer() >= 5) // Arno: update DL speed. Tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
1349 // transfer()->OnRecvData( pow((double)2,(double)5)*((double)transfer()->chunk_size()) );
1350 transfer()->OnRecvData(transfer()->chunk_size());
1351
1352 data_in_ = tintbin(NOW,bin_t::NONE);
1353 data_in_.bin = pos;
1354 // Ric: the time of the ack is the owd.
1355 if (peer_time!=TINT_NEVER)
1356 data_in_.time = NOW - peer_time;
1357
1358 UpdateDIP(pos);
1359 CleanHintOut(pos);
1360 bytes_down_ += length;
1361 global_bytes_down += length;
1362
1363 // SIGNPEAK
1364 // Purge hash tree, if desired
1365 if (hs_out_->live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) {
1366 // Discard parts of tree no longer in window
1367 LiveTransfer *lt = (LiveTransfer *)transfer();
1368 LiveHashTree *umt = (LiveHashTree *)hashtree();
1369 lt->OnDataPruneTree(*hs_out_,pos,umt->GetNChunksPerSig());
1370 }
1371
1372 return pos;
1373 }
1374
1375
UpdateRTT(tint owd)1376 void Channel::UpdateRTT(tint owd)
1377 {
1378
1379 // one-way delay calculations
1380 std::pair <tint,tint> delay(owd, NOW);
1381 owd_current_.push_front(delay);
1382
1383 if (owd_min_bin_start_+ LEDBAT_ROLLOVER < NOW) {
1384 owd_min_bin_start_ = NOW;
1385 owd_min_bin_ = owd_min_bin_ == 9 ? 0 : owd_min_bin_ + 1;
1386 owd_min_bins_[owd_min_bin_] = owd;
1387 } else if (owd_min_bins_[owd_min_bin_]>owd)
1388 owd_min_bins_[owd_min_bin_] = owd;
1389
1390 ack_rcvd_recent_++;
1391
1392 }
1393
UpdateDIP(bin_t pos)1394 void Channel::UpdateDIP(bin_t pos)
1395 {
1396 if (!pos.is_none()) {
1397 if (last_data_in_time_) {
1398 /*
1399 tint dip = NOW - last_data_in_time_;
1400 dip_avg_ = (dip_avg_*7 + dip) >> 3;
1401 */
1402 tint dip = NOW - last_data_in_time_;
1403 std::pair <tint,tint> dip_hist(dip, NOW);
1404 dip_list_.push_front(dip_hist);
1405 dip_avg_ = (dip_avg_*3 + dip) >> 2;
1406 }
1407 last_data_in_time_ = NOW;
1408 }
1409
1410 // RTTFIX
1411 /* Arno: in a true client/server scenario the initial RTT is used to set
1412 * rtt_avg_ and it is never updated. If that gets a bad sample this can
1413 * kill performance. Simple workaround against too high values.
1414 * Ric: we need to prevent also against burst in the network link.
1415 * It also happens that RTT is too small
1416 */
1417 if (rtt_hint_tintbin_.bin == pos && IsComplete()) {
1418 tint diff = NOW - rtt_hint_tintbin_.time;
1419 // Conservative: only adjust rtt_avg_ if 2x smaller
1420 if (diff < rtt_avg_>>1) {
1421 dprintf("%s #%" PRIu32 " rtt adjust %" PRIi64 " -> %" PRIi64 "\n",tintstr(),id_,rtt_avg_,diff);
1422 rtt_avg_ = diff;
1423 }
1424 // Ric: check if our values are wrong!
1425 tint owd = data_in_.time;
1426 if (owd != TINT_NEVER && owd<<2 > rtt_avg_) {
1427 dprintf("%s #%" PRIu32 " rtt adjust %" PRIi64 " -> %" PRIi64 "\n",tintstr(),id_,rtt_avg_,diff);
1428 rtt_avg_ = owd<<2;
1429 }
1430 rtt_hint_tintbin_.bin = bin_t::NONE;
1431 }
1432 }
1433
1434
OnAck(struct evbuffer * evb)1435 void Channel::OnAck(struct evbuffer *evb)
1436 {
1437
1438 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1439 if (bv.size() == 0) {
1440 // Could not parse chunk spec
1441 dprintf("%s #%" PRIu32 " ?ack bad chunk spec\n",tintstr(),id_);
1442 Close(CLOSE_DO_NOT_SEND);
1443 evbuffer_drain(evb, evbuffer_get_length(evb));
1444 return;
1445 }
1446 tint peer_owd = evbuffer_remove_64be(evb);
1447
1448 munro_ack_rcvd_ = true;
1449
1450 binvector::iterator iter;
1451 for (iter=bv.begin(); iter != bv.end(); iter++) {
1452 bin_t ackd_pos = *iter;
1453
1454 // FIXME FIXME: wrap around here
1455 if (ackd_pos.is_none()) // safety catch
1456 return; // likely, broken chunk/ insufficient hashes
1457 if (transfer()->ttype() == FILE_TRANSFER && hashtree() != NULL && hashtree()->size()
1458 && ackd_pos.base_offset()>=hashtree()->size_in_chunks()) {
1459 eprintf("invalid ack: %s\n",ackd_pos.str().c_str());
1460 return;
1461 }
1462 ack_in_.set(ackd_pos);
1463
1464 //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( transfer()->ack_out()->get_height() ));
1465
1466 int di = 0, ri = 0;
1467 // find an entry for the send (data out) event
1468 while (di<data_out_.size() && (data_out_[di]==tintbin() || !ackd_pos.contains(data_out_[di].bin)))
1469 di++;
1470 // FUTURE: delayed acks
1471 // rule out retransmits
1472 // Ric: by ruling out retransmits we screw up ledbat calculations
1473 while (ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin))
1474 ri++;
1475
1476 dprintf("%s #%" PRIu32 " %cack %s owd:%" PRIi64 "\n",tintstr(),id_,
1477 di==data_out_.size() ? (ri==data_out_tmo_.size() ? '?':'R') : '-',ackd_pos.str().c_str(),peer_owd);
1478
1479 if (di!=data_out_.size()) {
1480 // Ric: FIXME assuming direct sending of acks
1481 tint rtt = NOW-data_out_[di].time;
1482
1483 // Ric: quickly adapt to new network changes! (with large owd samples the previous rtt values influence
1484 //if (owd > rtt_avg_)
1485 // rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
1486 //else
1487 rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
1488 dev_avg_ = (dev_avg_*3 + tintabs(rtt-rtt_avg_)) >> 2;
1489 assert(data_out_[di].time!=TINT_NEVER);
1490 dprintf("%s #%" PRIu32 " rtt:%" PRIu64 ", rtt_avg:%" PRIu64 " dev:%" PRIu64 "\n", tintstr(), id_,rtt, rtt_avg_,
1491 dev_avg_);
1492
1493 UpdateRTT(peer_owd);
1494 dprintf("%s #%" PRIu32 " setting null %s\n",tintstr(),id_, data_out_[di].bin.str().c_str());
1495 // early loss detection by packet reordering
1496 /* TODO do we really need it?
1497 for (int re=0; re<di-MAX_REORDERING; re++) {
1498 if (data_out_[re]==tintbin())
1499 continue;
1500 ack_not_rcvd_recent_++;
1501 data_out_size_--;
1502 data_out_tmo_.push_back(data_out_[re].bin);
1503 dprintf("%s #%" PRIu32 " Rdata %s\n",tintstr(),id_,data_out_.front().bin.str().c_str());
1504 data_out_cap_ = bin_t::ALL;
1505 data_out_[re] = tintbin();
1506 }*/
1507 data_out_size_--;
1508 data_out_[di]=tintbin();
1509 } else if (ri!=data_out_tmo_.size()) {
1510 // Ric: TODO test
1511 //UpdateRTT(peer_owd);
1512 data_out_tmo_[ri]=tintbin();
1513 }
1514
1515 }
1516
1517 // clear zeroed items
1518 // TODO: do we really need it?
1519 /*while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
1520 ack_in_.is_filled(data_out_.front().bin) ) ) {
1521 dprintf("%s #%" PRIu32 " removing %s\n",tintstr(),id_, data_out_.front().bin.str().c_str());
1522 data_out_.pop_front();
1523 }
1524 while (!data_out_tmo_.empty() && ( data_out_tmo_.front()==tintbin() ||
1525 ack_in_.is_filled(data_out_tmo_.front().bin) ) )
1526 data_out_tmo_.pop_front();
1527 assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);*/
1528 }
1529
1530
TimeoutDataOut()1531 void Channel::TimeoutDataOut()
1532 {
1533 // losses: timeouted packets
1534 // Ric: aggressively timeout only if in active transmission (using cc like LEDBAT)
1535 tint timeout = NOW - ack_timeout();
1536 if (send_control_!=LEDBAT_CONTROL)
1537 timeout -= ack_timeout()<<1;
1538
1539 while (!data_out_.empty() &&
1540 (data_out_.front().time<timeout || data_out_.front()==tintbin())) {
1541 if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
1542 ack_not_rcvd_recent_++;
1543 data_out_cap_ = bin_t::ALL;
1544 // Ric: keep the original timing... otherwise calculations are wrong once
1545 // we get the ack back
1546 data_out_tmo_.push_back(data_out_.front());
1547 data_out_size_--;
1548 dprintf("%s #%" PRIu32 " Tdata %s\n",tintstr(),id_,data_out_.front().bin.str().c_str());
1549 }
1550 data_out_.pop_front();
1551 }
1552 // clear retransmit queue of older items
1553 while (!data_out_tmo_.empty() && (data_out_tmo_.front()==tintbin()
1554 || data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)) {
1555 data_out_tmo_.pop_front();
1556 //data_out_size_--;
1557 }
1558
1559 // use the same value to clean the delay samples
1560 while (owd_current_.size() > 4 && owd_current_.back().second < timeout) {
1561 owd_current_.pop_back();
1562 }
1563 }
1564
1565
OnHave(struct evbuffer * evb)1566 void Channel::OnHave(struct evbuffer *evb)
1567 {
1568
1569 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1570 if (bv.size() == 0) {
1571 // Could not parse chunk spec
1572 dprintf("%s #%" PRIu32 " ?have bad chunk spec\n",tintstr(),id_);
1573 Close(CLOSE_DO_NOT_SEND);
1574 evbuffer_drain(evb, evbuffer_get_length(evb));
1575 return;
1576 }
1577 binvector::iterator iter;
1578 for (iter=bv.begin(); iter != bv.end(); iter++) {
1579 bin_t ackd_pos = *iter;
1580
1581 if (ackd_pos.is_none()) // safety catch
1582 return; // wow, peer has hashes
1583
1584 // PPPLUG
1585 if (!hashtree()->is_complete() && transfer()->ttype() == FILE_TRANSFER) {
1586 FileTransfer *ft = (FileTransfer *)transfer();
1587
1588 // Ric: update the availability if needed
1589 ft->availability()->set(id_, ack_in_, ackd_pos);
1590 }
1591
1592 ack_in_.set(ackd_pos);
1593 dprintf("%s #%" PRIu32 " -have %s\n",tintstr(),id_,ackd_pos.str().c_str());
1594
1595 if (transfer()->ttype() == LIVE_TRANSFER) {
1596 OnHaveLive(ackd_pos);
1597 }
1598 }
1599 }
1600
1601
OnHaveLive(bin_t ackd_pos)1602 void Channel::OnHaveLive(bin_t ackd_pos)
1603 {
1604 // Arno, 2012-01-09: Provide PiecePicker with info needed for hook-in.
1605 // Arno, 2013-02-14: Could be optimized such that only right-most bin
1606 // is communicated to LivePiecePicker.
1607 LiveTransfer *lt = (LiveTransfer *)transfer();
1608 if (!lt->am_source()) {
1609 if (hs_in_ != NULL && hs_in_->live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) {
1610 // Filter ack_in_ using live discard window. Peer will only have
1611 // the chunks in that window, so we should not pick outside.
1612
1613 if (ack_in_right_basebin_ == bin_t::NONE || ackd_pos.base_right() > ack_in_right_basebin_) {
1614 // 1. Update last non-empty base bin
1615 ack_in_right_basebin_ = ackd_pos.base_right();
1616
1617 // 2. Calc start of window from last non-empty bin
1618 if (ack_in_right_basebin_.layer_offset() >= hs_in_->live_disc_wnd_) {
1619 bin_t firstbasepos = bin_t(0,ack_in_right_basebin_.layer_offset() - hs_in_->live_disc_wnd_+1);
1620
1621 // 3. Empty all bins before start of window
1622 binvector cbv;
1623 swift::chunk32_to_bin32(0, firstbasepos.layer_offset(), &cbv); // firsbasepos exclusive
1624 binvector::iterator iter;
1625 for (iter=cbv.begin(); iter != cbv.end(); iter++) {
1626 bin_t cpos = *iter;
1627 ack_in_.reset(cpos);
1628 }
1629 dprintf("%s #%" PRIu32 " have window %s %s\n",tintstr(),id_,firstbasepos.str().c_str(),
1630 ack_in_right_basebin_.str().c_str());
1631 }
1632 }
1633 }
1634
1635 // Arno: it can happen that we receive a HAVE and have no hints
1636 // outstanding. In that case we should not wait till next_send_time_
1637 // but request directly. See send_control.cpp
1638 if (hint_out_.size() == 0) {
1639 live_have_no_hint_ = true;
1640 dprintf("%s #%" PRIu32 " have but no hints\n",tintstr(),id_);
1641 }
1642 }
1643 }
1644
1645
OnHint(struct evbuffer * evb)1646 void Channel::OnHint(struct evbuffer *evb)
1647 {
1648
1649 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1650 if (bv.size() == 0) {
1651 // Could not parse chunk spec
1652 dprintf("%s #%" PRIu32 " ?hint bad chunk spec\n",tintstr(),id_);
1653 Close(CLOSE_DO_NOT_SEND);
1654 evbuffer_drain(evb, evbuffer_get_length(evb));
1655 return;
1656 }
1657
1658 binvector::iterator iter;
1659 for (iter=bv.begin(); iter != bv.end(); iter++) {
1660 bin_t hint = *iter;
1661
1662 // Ric: TODO test
1663 tbqueue::iterator it = hint_in_.begin();
1664 while (it != hint_in_.end()) { // removing likely snubbed hints
1665 bin_t b = it->bin;
1666 if (hint.contains(b) || b.contains(hint))
1667 dprintf("%s #%" PRIu32 " hint already requested hint:%s - and:%s!!\n", tintstr(), id_, hint.str().c_str(),
1668 b.str().c_str());
1669 it++;
1670 }
1671
1672 // FIXME: wake up here
1673 hint_in_.push_back(hint);
1674 hint_in_size_ += hint.base_length();
1675 dprintf("%s #%" PRIu32 " -hint %s [%" PRIu64 "]\n",tintstr(),id_,hint.str().c_str(),hint_in_size_);
1676
1677 // SIGNPEAK
1678 //if (hs_in_ != NULL && hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
1679 // RemoveSinceSignedPeakTuples(hint);
1680 }
1681 }
1682
1683 /*
1684 * Read full HANDSHAKE message from evb outside of a Channel context.
1685 */
StaticOnHandshake(Address & addr,uint32_t cid,bool ver_known,popt_version_t ver,struct evbuffer * evb)1686 Handshake *Channel::StaticOnHandshake(Address &addr, uint32_t cid, bool ver_known, popt_version_t ver,
1687 struct evbuffer *evb)
1688 {
1689 dprintf("StaticOnHandshake: %s id %" PRIu32 " known %d ver %d\n", addr.str().c_str(), cid, (int)ver_known, ver);
1690
1691 Handshake *hs = new Handshake();
1692
1693 if (!ver_known) {
1694 uint8_t msgid = evbuffer_remove_8(evb);
1695 if (msgid == SWIFT_INTEGRITY)
1696 ver = VER_SWIFT_LEGACY;
1697 else if (msgid == SWIFT_HANDSHAKE)
1698 ver = VER_PPSPP_v1;
1699 else {
1700 dprintf("%s #%" PRIu32 " ?hs unknown protocol %d %s\n", tintstr(),cid,msgid,addr.str().c_str());
1701 delete hs;
1702 return NULL;
1703 }
1704 }
1705
1706 if (ver == VER_SWIFT_LEGACY) {
1707 //dprintf("%s #%" PRIu32 " -hs swift legacy\n", tintstr(), cid );
1708 hs->version_ = VER_SWIFT_LEGACY;
1709 if (cid == 0) {
1710 // He is initiating. Initiating handshake has SWIFT_HASH + root hash, reply doesn't
1711 if (evbuffer_get_length(evb)<4+1+4+Sha1Hash::SIZE) {
1712 dprintf("%s #0 incorrect size %i initial handshake packet %s\n", tintstr(),(int)evbuffer_get_length(evb),
1713 addr.str().c_str());
1714 delete hs;
1715 return NULL;
1716 }
1717 bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
1718 if (!pos.is_all()) {
1719 dprintf("%s #%" PRIu32 " ?hs that is not the root hash %s\n",tintstr(), cid, addr.str().c_str());
1720 delete hs;
1721 return NULL;
1722 }
1723 Sha1Hash roothash = evbuffer_remove_hash(evb);
1724 SwarmID swarmid(roothash);
1725 hs->SetSwarmID(swarmid);
1726 dprintf("%s #%" PRIu32 " -hash ALL %s\n",tintstr(),cid,swarmid.hex().c_str());
1727 }
1728
1729 // Read SWIFT_HANDSHAKE
1730 uint8_t msgid = evbuffer_remove_8(evb);
1731 hs->peer_channel_id_ = evbuffer_remove_32be(evb);
1732 hs->ResetToLegacy();
1733
1734 dprintf("%s #%" PRIu32 " -hs swift %x\n",tintstr(),cid,hs->peer_channel_id_);
1735 } else if (ver == VER_PPSPP_v1) {
1736 // IETF PPSP compliant
1737 dprintf("%s #%" PRIu32 " -hs ietf ppsp\n", tintstr(),cid);
1738 hs->peer_channel_id_ = evbuffer_remove_32be(evb);
1739 bool end=false;
1740 uint8_t size8 = 0, i8=0;
1741 uint16_t size = 0;
1742 uint8_t *swarmidbytes = NULL;
1743 uint8_t *msgbitmapbytes = NULL;
1744 SwarmID swarmid;
1745 std::ostringstream cross;
1746 while (!end && evbuffer_get_length(evb) > 0) {
1747 popt_t poid = (popt_t)evbuffer_remove_8(evb);
1748 //dprintf("%s #%" PRIu32 " -hs popt %d\n", tintstr(),cid, (int)poid );
1749 switch (poid) {
1750 case POPT_VERSION:
1751 hs->version_ = (popt_version_t)evbuffer_remove_8(evb);
1752 cross << "v" << hs->version_ << " ";
1753 break;
1754 case POPT_MIN_VERSION:
1755 hs->min_version_ = (popt_version_t)evbuffer_remove_8(evb);
1756 cross << "nv" << hs->min_version_ << " ";
1757 break;
1758 case POPT_SWARMID:
1759 size = evbuffer_remove_16be(evb);
1760 if (size > POPT_MAX_SWARMID_SIZE || evbuffer_get_length(evb) < size) {
1761 dprintf("%s #%" PRIu32 " ?hs popt swarmid too big\n",tintstr(),cid);
1762 delete hs;
1763 return NULL;
1764 }
1765 swarmidbytes = new uint8_t[size];
1766 evbuffer_remove(evb,swarmidbytes,size);
1767 swarmid = SwarmID(swarmidbytes,size);
1768 delete swarmidbytes;
1769 hs->SetSwarmID(swarmid);
1770 cross << "sid " << swarmid.hex() << " ";
1771 break;
1772 case POPT_CONT_INT_PROT:
1773 hs->cont_int_prot_ = (popt_cont_int_prot_t)evbuffer_remove_8(evb);
1774 cross << "cipm " << hs->cont_int_prot_ << " ";
1775 break;
1776 case POPT_MERKLE_HASH_FUNC:
1777 hs->merkle_func_ = (popt_merkle_func_t)evbuffer_remove_8(evb);
1778 cross << "mhf " << hs->merkle_func_ << " ";
1779 break;
1780 case POPT_LIVE_SIG_ALG:
1781 hs->live_sig_alg_ = (popt_live_sig_alg_t)evbuffer_remove_8(evb);
1782 cross << "lsa " << hs->live_sig_alg_ << " ";
1783 break;
1784 case POPT_CHUNK_ADDR:
1785 hs->chunk_addr_ = (popt_chunk_addr_t)evbuffer_remove_8(evb);
1786 cross << "cam " << hs->chunk_addr_ << " ";
1787 break;
1788 case POPT_LIVE_DISC_WND:
1789 if (hs->chunk_addr_ == POPT_CHUNK_ADDR_BIN32 || hs->chunk_addr_ == POPT_CHUNK_ADDR_CHUNK32)
1790 hs->live_disc_wnd_ = evbuffer_remove_32be(evb);
1791 else
1792 hs->live_disc_wnd_ = evbuffer_remove_64be(evb);
1793 cross << "ldw " << std::hex << hs->live_disc_wnd_ << std::dec << " ";
1794 break;
1795 case POPT_SUPP_MSGS:
1796 size8 = evbuffer_remove_8(evb);
1797 if (size8 > 8 || evbuffer_get_length(evb) < size8) {
1798 dprintf("%s #%" PRIu32 " ?hs popt supp msgs too big\n",tintstr(),cid);
1799 delete hs;
1800 return NULL;
1801 }
1802 msgbitmapbytes = evbuffer_pullup(evb,size8);
1803 evbuffer_drain(evb, size);
1804 cross << "msgs " << std::hex;
1805 for (i8=0; i8<size8; i8++)
1806 cross << (int)msgbitmapbytes[i8];
1807 cross << std::dec;
1808 break;
1809 case POPT_END:
1810 end = true;
1811 dprintf("%s #%" PRIu32 " -hs %x ppsp %s\n", tintstr(), cid, hs->peer_channel_id_, cross.str().c_str());
1812 break;
1813 default:
1814 dprintf("%s #%" PRIu32 " ?hs popt id unknown %i\n",tintstr(),cid,(int)poid);
1815 delete hs;
1816 return NULL;
1817 }
1818 }
1819 }
1820
1821 return hs;
1822 }
1823
OnHandshake(Handshake * hishs)1824 void Channel::OnHandshake(Handshake *hishs)
1825 {
1826
1827 dprintf("OnHandshake\n");
1828
1829 if (hishs->peer_channel_id_ == 0) {
1830 // Arno: Got explicit close from peer, close channel and don't send reply
1831 dprintf("%s #%" PRIu32 " -hs close\n",tintstr(),id_);
1832 Close(CLOSE_DO_NOT_SEND);
1833 delete hishs;
1834 return;
1835 } else
1836 dprintf("%s #%" PRIu32 " -hs %x %s opened as channel %" PRIu32 "\n",tintstr(),id_,hishs->peer_channel_id_,
1837 (hishs->version_ == VER_SWIFT_LEGACY) ? "swift" : "ppsp", id_);
1838
1839 if (!hishs->IsSupported()) {
1840 dprintf("%s #%" PRIu32 " -hs unsupported\n",tintstr(),id_);
1841 Close(CLOSE_SEND);
1842 delete hishs;
1843 return;
1844 }
1845
1846 hs_in_ = hishs;
1847 hs_in_->ReleaseSwarmID(); // save mem per channel
1848
1849 // Self-connection check
1850 if (!SELF_CONN_OK) {
1851 uint32_t try_id = DecodeID(hishs->peer_channel_id_);
1852 // Arno, 2012-05-29: Fixed duplicate test
1853 if (channel(try_id) == this) {
1854 // this is a self-connection
1855 dprintf("%s #%" PRIu32 " -hs closing self\n",tintstr(),id_);
1856 Close(CLOSE_SEND);
1857 hs_in_ = NULL;
1858 delete hishs;
1859 return;
1860 }
1861 }
1862
1863 if (hs_in_->version_ == VER_SWIFT_LEGACY)
1864 hs_out_->ResetToLegacy(); // he speaks legacy, so will I
1865
1866 // FUTURE: channel forking
1867 if (is_established()) // when this was reply to our HS
1868 dprintf("%s #%" PRIu32 " established %s\n", tintstr(), id_, peer().str().c_str());
1869 }
1870
1871
OnCancel(struct evbuffer * evb)1872 void Channel::OnCancel(struct evbuffer *evb)
1873 {
1874
1875 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1876 if (bv.size() == 0) {
1877 // Could not parse chunk spec
1878 dprintf("%s #%" PRIu32 " ?cancel bad chunk spec\n",tintstr(),id_);
1879 Close(CLOSE_DO_NOT_SEND);
1880 evbuffer_drain(evb, evbuffer_get_length(evb));
1881 return;
1882 }
1883
1884 // Arno, 2012-11-23: chunkaddr translated to list of bins, iterate and
1885 // process in two ways:
1886 // 1. Remove bins from hint_in_ that are contained in the cancelled bins.
1887 // A hint may have been partially answered so it may be split into
1888 // smaller bins.
1889 // 2. Split up bins from hint_in_ that have only been partially canceled.
1890 // i.e., where cancelbin is contained by a bin in hint_in_
1891 //
1892 // If the hint is already in progress (i.e, already transmitted, not yet
1893 // acked), we let it be.
1894 //
1895 binvector::iterator iter;
1896 for (iter=bv.begin(); iter != bv.end(); iter++) {
1897 bin_t cancelbin = *iter;
1898 dprintf("%s #%" PRIu32 " -cancel %s\n",tintstr(),id_,cancelbin.str().c_str());
1899
1900 // 1. Remove hint from hint_in_ if contained in cancelbin. Use Riccardo's solution:
1901 int hi = 0;
1902 while (hi<hint_in_.size() && !cancelbin.contains(hint_in_[hi].bin) && cancelbin != hint_in_[hi].bin)
1903 hi++;
1904
1905 // something to cancel?
1906 if (hi != hint_in_.size()) {
1907 // Assumption: all fragments of a bin being cancelled consecutive in hint_in_
1908 do {
1909 //dprintf("%s #%" PRIu32 " -cancel frag %s\n",tintstr(),id_,hint_in_[hi].bin.str().c_str());
1910 hint_in_size_ -= hint_in_[hi].bin.base_length();
1911 hint_in_.erase(hint_in_.begin()+hi);
1912 if (hint_in_.size() == 0 || hi >= hint_in_.size())
1913 break;
1914 } while (cancelbin.contains(hint_in_[hi].bin));
1915 }
1916
1917 //dprintf("%s #%" PRIu32 " -cancel ORIG %s len %d\n",tintstr(),id_,cancelbin.str().c_str(), hint_in_.size() );
1918
1919 // 2. Fragment hint from hint_in_ if it covers cancelbin. Use Riccardo's solution:
1920 hi = 0;
1921 while (hi<hint_in_.size() && !hint_in_[hi].bin.contains(cancelbin))
1922 hi++;
1923
1924 // nothing to cancel
1925 if (hi==hint_in_.size())
1926 continue;
1927
1928 // Split up hint
1929 tint origt = hint_in_[hi].time;
1930 bin_t origbin = hint_in_[hi].bin;
1931 binvector fragbins = swift::bin_fragment(origbin,cancelbin);
1932 // Erase original
1933 hint_in_size_ -= hint_in_[hi].bin.base_length();
1934 hint_in_.erase(hint_in_.begin()+hi);
1935 // Replace with fragments left
1936 binvector::iterator iter2;
1937 int idx=0;
1938 for (iter2=fragbins.begin(); iter2 != fragbins.end(); iter2++) {
1939 bin_t fragbin = *iter2;
1940 //dprintf("%s #%" PRIu32 " -cancel keep %s\n",tintstr(),id_,fragbin.str().c_str());
1941 tintbin newtb(origt,fragbin);
1942 hint_in_size_ += fragbin.base_length();
1943 hint_in_.insert(hint_in_.begin()+hi+idx,newtb);
1944 idx++;
1945 }
1946 }
1947
1948 for (int i=0; i<hint_in_.size(); i++) {
1949 dprintf("%s #%" PRIu32 " -cancel NETS %s\n",tintstr(),id_,hint_in_[i].bin.str().c_str());
1950 }
1951 }
1952
1953
OnPexAdd(struct evbuffer * evb,int family)1954 void Channel::OnPexAdd(struct evbuffer *evb, int family)
1955 {
1956 Address addr = evbuffer_remove_pexaddr(evb,family);
1957 dprintf("%s #%" PRIu32 " -pex %s %s\n",tintstr(),id_,(addr.get_family() == AF_INET) ? "v4" : "v6", addr.str().c_str());
1958
1959 if (transfer()->OnPexIn(addr))
1960 useless_pex_count_ = 0;
1961 else {
1962 dprintf("%s #%" PRIu32 " already channel to %s\n", tintstr(),id_,addr.str().c_str());
1963 useless_pex_count_++;
1964 }
1965 pex_request_outstanding_ = false;
1966 }
1967
1968
OnPexAddCert(struct evbuffer * evb)1969 void Channel::OnPexAddCert(struct evbuffer *evb)
1970 {
1971 OnPexAddCertZeroState(evb);
1972 dprintf("%s #%" PRIu32 " -pex cert\n",tintstr(),id_);
1973 }
1974
1975
OnChoke(struct evbuffer * evb)1976 void Channel::OnChoke(struct evbuffer *evb)
1977 {
1978 if (hs_in_->version_ == VER_SWIFT_LEGACY) { // FRAGRAND support
1979 evbuffer_remove_32be(evb); // read 4 random bytes
1980 return;
1981 }
1982
1983 //PPSPTODO
1984 dprintf("%s #%" PRIu32 " -choke\n",tintstr(),id_);
1985 }
1986
OnUnchoke(struct evbuffer * evb)1987 void Channel::OnUnchoke(struct evbuffer *evb)
1988 {
1989 //PPSPTODO
1990 dprintf("%s #%" PRIu32 " -unchoke\n",tintstr(),id_);
1991 }
1992
1993
OnSignedHash(struct evbuffer * evb)1994 void Channel::OnSignedHash(struct evbuffer *evb)
1995 {
1996 if (transfer()->ttype() != LIVE_TRANSFER) {
1997 dprintf("%s #%" PRIu32 " ?sigh not live swarm\n",tintstr(),id_);
1998 Close(CLOSE_DO_NOT_SEND);
1999 evbuffer_drain(evb, evbuffer_get_length(evb));
2000 return;
2001 }
2002
2003 binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
2004 if (bv.size() == 0 || bv.size() > 1) {
2005 // chunk spec for hash must be power-of-2 range, so must fit in single bin
2006 dprintf("%s #%" PRIu32 " ?sigh bad chunk spec\n",tintstr(),id_);
2007 Close(CLOSE_DO_NOT_SEND);
2008 evbuffer_drain(evb, evbuffer_get_length(evb));
2009 return;
2010 }
2011 bin_t pos = bv.front();
2012
2013 tint source_tint = evbuffer_remove_64be(evb);
2014
2015 uint16_t siglen = SWIFT_CIPM_NONE_SIGLEN;
2016 LiveHashTree *umt = (LiveHashTree *)hashtree();
2017 if (umt != NULL)
2018 siglen = umt->GetSigSizeInBytes();
2019
2020 uint8_t *sigdata = new uint8_t[siglen];
2021 if (sigdata == NULL) {
2022 dprintf("%s #%" PRIu32 " ?sigh no mem siglen %" PRIu32 "\n",tintstr(),id_,siglen);
2023 Close(CLOSE_DO_NOT_SEND);
2024 evbuffer_drain(evb, evbuffer_get_length(evb));
2025 return;
2026 }
2027 evbuffer_remove(evb,sigdata,siglen);
2028 Signature sig(sigdata,siglen);
2029 delete sigdata;
2030
2031 // Don't accept signed hashes from others as source
2032 LiveTransfer *lt = (LiveTransfer *)transfer();
2033 if (lt->am_source())
2034 return;
2035
2036 if (umt != NULL) {
2037 SigTintTuple sigtint(sig,source_tint);
2038
2039 bool newverified = umt->OfferSignedMunroHash(pos,sigtint);
2040 if (!newverified)
2041 dprintf("%s #%" PRIu32 " !sigh %s\n",tintstr(),id_,pos.str().c_str());
2042 else {
2043 if (source_tint+(SWIFT_LIVE_MAX_SOURCE_DIVERGENCE_TIME*TINT_SEC) < NOW)
2044 dprintf("%s #%" PRIu32 " *sigh %s\n",tintstr(),id_,pos.str().c_str()); // outdated Sig
2045 else {
2046 dprintf("%s #%" PRIu32 " -sigh %s\n",tintstr(),id_,pos.str().c_str());
2047
2048 LiveTransfer *lt = (LiveTransfer *)transfer();
2049 lt->OnVerifiedMunroHash(pos,source_tint);
2050 }
2051 }
2052 } else if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
2053 dprintf("%s #%" PRIu32 " -sigh %s\n",tintstr(),id_,pos.str().c_str());
2054
2055 LiveTransfer *lt = (LiveTransfer *)transfer();
2056 lt->OnVerifiedMunroHash(pos,source_tint);
2057 } else {
2058 dprintf("%s #%" PRIu32 " ?sigh %s\n",tintstr(),id_,pos.str().c_str());
2059 }
2060 }
2061
2062
2063 /*
2064 * Sending messages
2065 */
2066
AddPex(struct evbuffer * evb)2067 void Channel::AddPex(struct evbuffer *evb)
2068 {
2069 // Gertjan fix: Reverse PEX
2070 // PEX messages sent to facilitate NAT/FW puncturing get priority
2071 if (!reverse_pex_out_.empty()) {
2072 do {
2073 tintbin pex_peer = reverse_pex_out_.front();
2074 reverse_pex_out_.pop_front();
2075 if (channels[(int) pex_peer.bin.toUInt()] == NULL)
2076 continue;
2077 Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
2078 // Arno, 2012-02-28: Don't send private addresses to non-private peers.
2079 if (!a.is_private() || (a.is_private() && peer().is_private())) {
2080 evbuffer_add_pexaddr(evb, a);
2081 dprintf("%s #%" PRIu32 " +pex (reverse) %s\n",tintstr(),id_,a.str().c_str());
2082 }
2083 } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
2084
2085 // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
2086 // that you only get back the addr of people that got your addr.
2087 // Disable for now.
2088 //return;
2089 }
2090
2091 if (!pex_requested_)
2092 return;
2093
2094 // Arno, 2012-02-28: Don't send private addresses to non-private peers.
2095 int tries=0;
2096 Channel *c=NULL;
2097 Address a;
2098 while (true) {
2099 // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
2100 c = transfer()->RandomChannel(this);
2101 if (c == NULL || tries > 5) {
2102 pex_requested_ = false;
2103 return;
2104 }
2105 a = c->peer();
2106 if (!a.is_private() || (a.is_private() && peer().is_private()))
2107 break;
2108 tries++;
2109 }
2110
2111 evbuffer_add_pexaddr(evb, a);
2112 dprintf("%s #%" PRIu32 " +pex %s\n",tintstr(),id_,a.str().c_str());
2113
2114 pex_requested_ = false;
2115 /* Ensure that we don't add the same id to the reverse_pex_out_ queue
2116 more than once. */
2117 int chid = c->id();
2118 for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
2119 i != channels[chid]->reverse_pex_out_.end(); i++)
2120 if ((int)(i->bin.toUInt()) == id_)
2121 return;
2122
2123 dprintf("%s #%" PRIu32 " adding pex for channel %" PRIu32 " at time %s\n", tintstr(), chid,
2124 id_, tintstr(NOW + 2 * TINT_SEC));
2125 // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
2126 channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
2127 if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
2128 channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
2129 channels[chid]->Reschedule();
2130 }
2131
2132
2133
OnPexReq(void)2134 void Channel::OnPexReq(void)
2135 {
2136 dprintf("%s #%" PRIu32 " -pex req\n", tintstr(), id_);
2137 if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
2138 pex_requested_ = true;
2139 }
2140
AddPexReq(struct evbuffer * evb)2141 void Channel::AddPexReq(struct evbuffer *evb)
2142 {
2143 // Rate limit the number of PEX requests
2144 if (NOW < next_pex_request_time_)
2145 return;
2146
2147 // If no answer has been received from a previous request, count it as useless
2148 if (pex_request_outstanding_)
2149 useless_pex_count_++;
2150
2151 pex_request_outstanding_ = false;
2152
2153 // Initiate at most SWIFT_MAX_CONNECTIONS connections
2154 if (transfer()->GetChannels()->size() >= SWIFT_MAX_OUTGOING_CONNECTIONS ||
2155 // Check whether this channel has been providing useful peer information
2156 useless_pex_count_ > 2) {
2157 // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
2158 // let's just try again in 30s
2159 useless_pex_count_ = 0;
2160 next_pex_request_time_ = NOW + 30 * TINT_SEC;
2161
2162 return;
2163 }
2164
2165 dprintf("%s #%" PRIu32 " +pex req\n", tintstr(), id_);
2166 evbuffer_add_8(evb, SWIFT_PEX_REQ);
2167 /* Add a little more than the minimum interval, such that the other party is
2168 less likely to drop it due to too high rate */
2169 next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
2170 pex_request_outstanding_ = true;
2171 }
2172
2173
2174
2175 /*
2176 * Channel class methods
2177 */
2178
LibeventReceiveCallback(evutil_socket_t fd,short event,void * arg)2179 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg)
2180 {
2181 // Called by libevent when a datagram is received on the socket
2182 Time();
2183 dprintf("%s recv callback\n",tintstr());
2184
2185 RecvDatagram(fd);
2186 event_add(&evrecv, NULL);
2187 }
2188
RecvDatagram(evutil_socket_t socket)2189 void Channel::RecvDatagram(evutil_socket_t socket)
2190 {
2191 struct evbuffer *evb = evbuffer_new();
2192 Address addr;
2193 Handshake *hishs = NULL;
2194
2195 RecvFrom(socket, addr, evb);
2196 size_t evboriglen = evbuffer_get_length(evb);
2197
2198 dprintf("%s recvdgram " PRISIZET "\n",tintstr(),evboriglen);
2199
2200 //#define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
2201 #define return_log(...) { dprintf(__VA_ARGS__); evbuffer_free(evb); if (hishs != NULL) { delete hishs; } return; }
2202 if (evbuffer_get_length(evb)<4)
2203 return_log("socket layer weird: datagram < 4 bytes from %s (prob ICMP unreach)\n",addr.str().c_str());
2204
2205 uint32_t mych = evbuffer_remove_32be(evb);
2206
2207 Channel* channel = NULL;
2208 if (mych==0) { // peer initiates handshake
2209
2210 hishs = StaticOnHandshake(addr,0,false,VER_PPSPP_v1,evb);
2211 if (hishs == NULL) // dprintf already called
2212 return_log("%s #0 ?hs bad\n",tintstr());
2213
2214 SwarmID swarmid = hishs->GetSwarmID();
2215 int td = swift::Find(swarmid,true); // Activate
2216 if (td < 0) {
2217 // No known swarm, check if available as zero state
2218 ZeroState *zs = ZeroState::GetInstance();
2219 td = zs->Find(swarmid.roothash());
2220 if (td == -1) {
2221 // Don't reply to strangers knocking
2222 //StaticSendClose(socket,addr,hishs->peer_channel_id_);
2223 return_log("%s #0 swarm %s unknown, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),addr.str().c_str());
2224 }
2225 }
2226 ContentTransfer *ct = swift::GetActivatedTransfer(td);
2227 if (ct == NULL) {
2228 StaticSendClose(socket,addr,hishs->peer_channel_id_);
2229 return_log("%s #0 swarm %s known, couldn't be activated; requested by %s\n", tintstr(),
2230 hishs->GetSwarmID().hex().c_str(), addr.str().c_str());
2231 } else if (!ct->IsOperational()) {
2232 // Activated, but broken
2233 StaticSendClose(socket,addr,hishs->peer_channel_id_);
2234 return_log("%s #0 swarm %s broken, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),addr.str().c_str());
2235 }
2236
2237 // Arno, 2012-02-27: Check for duplicate channel
2238 Channel* existchannel = ct->FindChannel(addr,NULL);
2239 if (existchannel != NULL) {
2240 // Arno: 2011-10-13: Ignore if established, otherwise consider
2241 // it a concurrent connection attempt.
2242 if (existchannel->is_established()) {
2243 // ARNOTODO: Read complete handshake here so we know whether
2244 // attempt is to new channel or to existing. Currently read
2245 // in OnHandshake()
2246 //
2247 // Arno, 2012-12-17: in Android app peers have hardwired port
2248 // so this happens often. Assuming that sender has reasons
2249 // to rehandshake, now just close old.
2250 dprintf("%s #0 have a channel already to %s, closing old\n",tintstr(),addr.str().c_str());
2251
2252 // Arno, 2012-12-17: On Android closing the channel causes swift
2253 // to crash. On Win32 I don't see this behaviour. For now, let
2254 // the channel die out by itself. The sender will not accept
2255 // the datagrams sent by this peer on the old channel because
2256 // it doesn't know the old channel ID.
2257 // existchannel->Close(CLOSE_DO_NOT_SEND);
2258 channel = NULL;
2259 } else {
2260 channel = existchannel;
2261 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
2262 }
2263 }
2264 if (channel == NULL) {
2265 if (ct->GetChannels()->size() < SWIFT_MAX_INCOMING_CONNECTIONS) {
2266 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str().c_str() );
2267 channel = new Channel(ct, socket, addr);
2268 } else {
2269 // Too many connections
2270 StaticSendClose(socket,addr,hishs->peer_channel_id_);
2271 return_log("%s #0 swarm %s too many connections, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),
2272 addr.str().c_str());
2273 }
2274 }
2275 //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hishs->GetSwarmID().hex().c_str(),channel->id());
2276
2277 channel->OnHandshake(hishs);
2278
2279 } else if (CmdGwTunnelCheckChannel(mych)) {
2280 // SOCKTUNNEL
2281 CmdGwTunnelUDPDataCameIn(addr,mych,evb);
2282 evbuffer_free(evb);
2283 return;
2284 } else { // peer responds to my handshake (and other messages)
2285 mych = DecodeID(mych);
2286 if (mych>=channels.size())
2287 return_log("%s invalid channel #%" PRIu32 ", %s\n",tintstr(),mych,addr.str().c_str());
2288 channel = channels[mych];
2289 if (!channel)
2290 return_log("%s #%" PRIu32 " is already closed\n",tintstr(),mych);
2291 if (channel->IsDiffSenderOrDuplicate(addr,mych)) {
2292 dprintf("%s #%" PRIu32 " ?channel diff or dup\n",tintstr(),mych);
2293 channel->Close(CLOSE_SEND_IF_ESTABLISHED);
2294 delete channel; // safe, not in a channel event
2295 return_log("%s #%" PRIu32 " is duplicate\n",tintstr(),mych);
2296 }
2297 channel->own_id_mentioned_ = true;
2298 }
2299 channel->raw_bytes_down_ += evboriglen;
2300 //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
2301 bool wasestablished = channel->is_established();
2302
2303 //dprintf("%s #%" PRIu32 " peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str().c_str(), channel->recv_peer().str(), addr.str() );
2304
2305 // Process messages
2306 if (channel->send_control_!=CLOSE_CONTROL)
2307 channel->Recv(evb);
2308
2309 evbuffer_free(evb);
2310
2311 //SAFECLOSE
2312 if (channel->send_control_==CLOSE_CONTROL) {
2313 // Arno, 2012-07-27: Received an explict close, clean up channel
2314 delete channel;
2315 }
2316 }
2317
2318
2319
2320 /*
2321 * Channel instance methods
2322 */
2323
CloseChannelByAddress(const Address & addr)2324 void Channel::CloseChannelByAddress(const Address &addr)
2325 {
2326 // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str().c_str() );
2327
2328 dprintf("%s #-1 close channel by address %s\n",tintstr(), addr.str().c_str());
2329 channels_t::iterator iter;
2330 for (iter = channels.begin(); iter != channels.end(); iter++) {
2331 Channel *c = *iter;
2332 if (c != NULL && c->peer_ == addr) {
2333 dprintf("%s #%" PRIu32 " close by addr\n",tintstr(),c->id());
2334 c->Close(CLOSE_DO_NOT_SEND);
2335 delete c; // safe, not in a send event, and doesn't modify channels
2336 break;
2337 }
2338 }
2339 }
2340
Close(close_send_t closesend)2341 void Channel::Close(close_send_t closesend)
2342 {
2343
2344 dprintf("%s #%" PRIu32 " closing channel\n",tintstr(),id_);
2345
2346 lprintf("\t\t==== Switch to Close Control ==== \n");
2347 this->SwitchSendControl(CLOSE_CONTROL);
2348
2349 if (closesend == CLOSE_SEND || (closesend == CLOSE_SEND_IF_ESTABLISHED && is_established()))
2350 this->Send(); // Arno: send explicit close
2351
2352 if (hs_in_ != NULL) // is_established() -> false
2353 hs_in_->peer_channel_id_ = 0;
2354
2355 //LIVE
2356 if (transfer()->ttype() == FILE_TRANSFER) {
2357 FileTransfer *ft = (FileTransfer *)transfer();
2358 if (!ft->IsZeroState()
2359 && ft->availability() != NULL) { // availability() is NULL when this is called from ContentTransfer/CloseChannels()
2360 // Ric: remove its binmap from the availability
2361 ft->availability()->removeBinmap(id_, ack_in_);
2362 }
2363 }
2364
2365 // SAFECLOSE
2366 // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
2367 ClearEvents();
2368 }
2369
2370
2371
Reschedule()2372 void Channel::Reschedule()
2373 {
2374
2375 // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
2376 // NOW to be 0, so any calls to Time in between may put things off. Sigh.
2377 Time();
2378
2379 if (evsend_ptr_ == NULL) {
2380 dprintf("%s #%" PRIu32 " cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
2381 return;
2382 }
2383 struct timeval currtv;
2384 // remove pending events if in keep-alive mode
2385 if (send_control_ == KEEP_ALIVE_CONTROL && evtimer_pending(evsend_ptr_, &currtv)) {
2386 evtimer_del(evsend_ptr_);
2387 }
2388
2389 dprintf("%s schedule\n",tintstr());
2390
2391 // Ric: before rescheduling check if we already have scheduled it in the past.
2392 // calculate the delay only if the reschedule has been called after a send, ignore reschedules
2393 // triggered by something received.
2394 if (last_send_time_>next_send_time_ && next_send_time_<NOW && send_control_ == LEDBAT_CONTROL) {
2395 dprintf("%s #%" PRIu32 " Already something scheduled for: %s\n",tintstr(),id_, tintstr(next_send_time_));
2396 reschedule_delay_ = NOW - next_send_time_;
2397 dprintf("%s #%" PRIu32 " reschedule delay :%" PRIi64 "\n",tintstr(),id_,reschedule_delay_);
2398 }
2399
2400 next_send_time_ = NextSendTime();
2401 dprintf("%s #%" PRIu32 " next send in :%" PRIi64 "\n",tintstr(),id_,next_send_time_-NOW);
2402 if (next_send_time_!=TINT_NEVER) {
2403
2404 assert(next_send_time_<NOW+TINT_MIN);
2405 tint duein = next_send_time_-NOW;
2406
2407 if (duein <= 0) {
2408 // Arno, 2011-10-18: libevent's timer implementation appears to be
2409 // really slow, i.e., timers set for 100 usec from now get called
2410 // at least two times later :-( Hence, for sends after receives
2411 // perform them directly.
2412 // Ric: The timer error of libevent can be propagated throughout the
2413 // download.. keep track of the delay error!
2414 dprintf("%s #%" PRIu32 " requeue direct send (%s)\n",tintstr(),id_, duein<=0 ? "duein" : "direct sending");
2415 //next_send_time_ = NOW;
2416
2417 LibeventSendCallback(-1,EV_TIMEOUT,this);
2418 } else {
2419 struct timeval duetv = *tint2tv(duein);
2420 evtimer_add(evsend_ptr_,&duetv);
2421 dprintf("%s #%" PRIu32 " requeue for %s in %" PRIi64 "\n",tintstr(),id_,tintstr(next_send_time_), duein);
2422 }
2423 } else {
2424 // SAFECLOSE
2425 dprintf("%s #%" PRIu32 " resched, will close\n",tintstr(),id_);
2426 // Arno: Cannot clean up send events or channel here as we may be
2427 // LibeventSendCallback() on a specific channel. Do on another event,
2428 // see ContentTransfer::LibeventCleanCallback()
2429 this->Schedule4Delete();
2430 }
2431 }
2432
2433
2434 /*
2435 * Channel class methods
2436 */
LibeventSendCallback(int fd,short event,void * arg)2437 void Channel::LibeventSendCallback(int fd, short event, void *arg)
2438 {
2439
2440 // Called by libevent when it is the requested send time.
2441 Time();
2442 dprintf("%s send callback\n",tintstr());
2443
2444 Channel * sender = (Channel*) arg;
2445 if (NOW<sender->next_send_time_-TINT_MSEC)
2446 dprintf("%s #%" PRIu32 " suspicious send %s<%s\n",tintstr(),
2447 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
2448 if (sender->next_send_time_ != TINT_NEVER)
2449 sender->Send();
2450 }
2451
2452
StaticSendClose(evutil_socket_t socket,Address & addr,uint32_t peer_channel_id)2453 void Channel::StaticSendClose(evutil_socket_t socket,Address &addr, uint32_t peer_channel_id)
2454 {
2455 if (peer_channel_id == 0)
2456 return; // safety catch
2457
2458 struct evbuffer *evb = evbuffer_new();
2459 evbuffer_add_32be(evb, peer_channel_id); // His channel ID
2460 evbuffer_add_8(evb, SWIFT_HANDSHAKE);
2461 evbuffer_add_32be(evb, 0); // Initial channel ID
2462 evbuffer_add_8(evb, POPT_END); // Empty protocol options list
2463
2464 int r = SendTo(socket,addr,evb);
2465 if (r==-1)
2466 print_error("swift can't send datagram");
2467
2468 evbuffer_free(evb);
2469 }
2470