1 /*
2  *  sendrecv.cpp
3  *  most of the swift's state machine
4  *
5  *  Created by Victor Grishchenko on 3/6/09.
6  *  Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
7  *
8  */
9 // Arno, 2013-06-11: Must come first to ensure SIZE_MAX etc are defined
10 #include "compat.h"
11 #include "bin_utils.h"
12 #include "swift.h"
13 #include <algorithm>  // kill it
14 #include <cassert>
15 #include <cfloat>
16 #include <sstream>
17 
18 using namespace swift;
19 using namespace std;
20 
21 struct event_base *Channel::evbase;
22 struct event Channel::evrecv;
23 
24 #define DEBUGTRAFFIC     1
25 
26 /** Arno: Victor's design allows a sender to choose some data to push to
27  * a receiver, if that receiver is not HINTing at data. Should be disabled
28  * when the receiver has a download rate limit.
29  */
30 #define ENABLE_SENDERSIZE_PUSH 0
31 
32 
33 #define ENABLE_CANCEL       0
34 
35 
36 /** Arno, 2011-11-24: When rate limit is on and the download is in progress
37  * we send HINTs for 2 chunks at the moment. This constant can be used to
38  * get greater granularity. Set to 0 for original behavior.
39  * Ric: set to 2, it should be smaller than 4 or we need to change the hint
40  * strategy first
41  * Ric: 2013-05: We need to always send out at least one req. for each channel.
42  * Otherwise, if the link between two peers is heavily congested or has a high
43  * pkt loss rate, ledbat will close the connection. (short explanation)
44  */
45 #define HINT_GRANULARITY    1 // chunks
46 
47 
48 
AddRequiredHashes(struct evbuffer * evb,bin_t pos,bool isretransmit)49 void Channel::AddRequiredHashes(struct evbuffer *evb, bin_t pos, bool isretransmit)
50 {
51     // We're either seeder or know what content integrity protection method
52     // is because seeder told us, so hs_out_ is guiding here so we can
53     // send peak hashes in first datagram.
54     // TODO: adjust DefaultHandshake from which hs_out_ is derived when
55     // an authoritative source proves the CIPM is UNIFIED_MERKLE.
56     //
57     if (transfer()->ttype() == FILE_TRANSFER) {
58         // Arno, 2013-02-25: Need to send peak bins always (also CIPM None)
59         // to cold clients to communicate tree size
60         if (ack_in_.is_empty() && hashtree() != NULL && hashtree()->peak_count() > 0) {
61             AddUnsignedPeakHashes(evb);
62         }
63 
64         if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE) {
65             if (pos != bin_t::NONE)
66                 AddFileUncleHashes(evb,pos);
67         }
68     } else {
69         // LIVE
70         if (PeerIsSource())
71             return;
72 
73         // See if there is a first, or new signed munro to send
74         bin_t munro = bin_t::NONE;
75         if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
76             // No content integrity protection, so just report current pos as
77             // source pos == munro
78             LivePiecePicker *lpp = (LivePiecePicker *)transfer()->picker();
79             if (lpp == NULL) {
80                 // I am source
81                 LiveTransfer *lt = (LiveTransfer *)transfer();
82                 munro = lt->GetSourceCurrentPos();
83             } else
84                 munro = lpp->GetCurrentPos();
85         } else { //POPT_CONT_INT_PROT_UNIFIED_MERKLE
86             LiveHashTree *umt = (LiveHashTree *)hashtree();
87             if (pos == bin_t::NONE) {
88                 // Initially send last signed munro
89                 munro = umt->GetLastMunro();
90             } else {
91                 // After, send munro required for pos
92                 munro = umt->GetMunro(pos);
93                 if (munro == bin_t::NONE)
94                     return;
95             }
96         }
97 
98         if (pos == bin_t::NONE) {
99             // Initially send last signed munro
100             dprintf("%s #%" PRIu32 " last munro %s\n",tintstr(),id_,munro.str().c_str());
101             bool ahead=false;
102             if (ack_in_right_basebin_ != bin_t::NONE) {
103                 if (ack_in_right_basebin_ > munro.base_right())
104                     ahead = true;
105             }
106             // Don't send when peer has chunks in range, or when it's downloading from us (e.g. chunks earlier than munro)
107             if (munro != bin_t::NONE && ack_in_.is_empty(munro) && !munro_ack_rcvd_ && !ahead) {
108                 AddLiveSignedMunroHash(evb,munro);
109                 last_sent_munro_ = munro;
110             }
111         } else {
112             // After, send munro required for pos
113             // Don't repeat if same and not retransmit
114             bool diff = (munro != last_sent_munro_);
115             last_sent_munro_ = munro;
116 
117             dprintf("%s #%" PRIu32 " munro for %s is %s\n",tintstr(),id_,pos.str().c_str(), munro.str().c_str());
118 
119             if (isretransmit || diff)
120                 AddLiveSignedMunroHash(evb,munro);
121 
122             if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
123                 AddLiveUncleHashes(evb,pos,munro,isretransmit);
124         }
125     }
126 }
127 
128 
AddUnsignedPeakHashes(struct evbuffer * evb)129 void Channel::AddUnsignedPeakHashes(struct evbuffer *evb)
130 {
131     for (int i=0; i<hashtree()->peak_count(); i++) {
132         bin_t peak = hashtree()->peak(i);
133         evbuffer_add_8(evb, SWIFT_INTEGRITY);
134         evbuffer_add_chunkaddr(evb,peak,hs_out_->chunk_addr_);
135         evbuffer_add_hash(evb, hashtree()->peak_hash(i));
136         dprintf("%s #%" PRIu32 " +phash %s\n",tintstr(),id_,peak.str().c_str());
137     }
138 }
139 
140 
141 // SIGNPEAK
AddLiveSignedMunroHash(struct evbuffer * evb,bin_t munro)142 void Channel::AddLiveSignedMunroHash(struct evbuffer *evb, bin_t munro)
143 {
144     BinHashSigTuple bhst = BinHashSigTuple::NOBULL;
145     if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
146         // No content integrity protection, create dummy signature tuple to send
147         uint8_t dummysigdata[SWIFT_CIPM_NONE_SIGLEN];
148         memset(dummysigdata,0,SWIFT_CIPM_NONE_SIGLEN);
149         Signature sig(dummysigdata, SWIFT_CIPM_NONE_SIGLEN);
150         SigTintTuple sigtint(sig, NOW);
151         bhst = BinHashSigTuple(munro,Sha1Hash::ZERO,sigtint);
152     } else {
153         // Send signed munro hash
154         LiveHashTree *umt = (LiveHashTree *)hashtree();
155         bhst = umt->GetSignedMunro(munro);
156     }
157     if (bhst.bin() == bin_t::NONE) {
158         dprintf("%s #%" PRIu32 " !mhash %s\n",tintstr(),id_,munro.str().c_str());
159         return;
160     }
161 
162     if (hs_out_->cont_int_prot_ != POPT_CONT_INT_PROT_NONE) {
163         evbuffer_add_8(evb, SWIFT_INTEGRITY);
164         evbuffer_add_chunkaddr(evb,bhst.bin(),hs_out_->chunk_addr_);
165         evbuffer_add_hash(evb, bhst.hash());
166     }
167 
168     dprintf("%s #%" PRIu32 " +mhash %s\n",tintstr(),id_,bhst.bin().str().c_str());
169 
170     //fprintf(stderr,"AddLiveSignedMunroHash: speak %s %s\n", bhst.bin().str().c_str(), bhst.hash().hex().c_str() );
171 
172     evbuffer_add_8(evb, SWIFT_SIGNED_INTEGRITY);
173     evbuffer_add_chunkaddr(evb,bhst.bin(),hs_out_->chunk_addr_);
174     evbuffer_add_64be(evb, bhst.sigtint().time());
175     evbuffer_add(evb, bhst.sigtint().sig().bits(), bhst.sigtint().sig().length());
176 
177     dprintf("%s #%" PRIu32 " +sigh %s %d\n",tintstr(),id_,bhst.bin().str().c_str(), bhst.sigtint().sig().length());
178 }
179 
180 
181 
AddFileUncleHashes(struct evbuffer * evb,bin_t pos)182 void Channel::AddFileUncleHashes(struct evbuffer *evb, bin_t pos)
183 {
184     bin_t peak = hashtree()->peak_for(pos);
185     binvector bv;
186     while (pos!=peak && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
187             ack_in_.is_empty(pos.parent())) {
188         // Ric: TODO optimise.. send based on pkt loss statistics
189         //      the above is correct but should not happen at the beginning!
190         //while (pos!=peak && ack_in_.is_empty(pos.parent()) ) {
191         bin_t uncle = pos.sibling();
192         bv.push_back(uncle);
193         pos = pos.parent();
194     }
195 
196     // PPSP -04: Send in descending layer order
197     binvector::reverse_iterator iter;
198     for (iter=bv.rbegin(); iter != bv.rend(); iter++) {
199         bin_t uncle = *iter;
200         evbuffer_add_8(evb, SWIFT_INTEGRITY);
201         evbuffer_add_chunkaddr(evb,uncle,hs_out_->chunk_addr_);
202         evbuffer_add_hash(evb, hashtree()->hash(uncle));
203         dprintf("%s #%" PRIu32 " +hash %s\n",tintstr(),id_,uncle.str().c_str());
204     }
205 
206 }
207 
208 //SIGNPEAK
AddLiveUncleHashes(struct evbuffer * evb,bin_t pos,bin_t munro,bool isretransmit)209 void Channel::AddLiveUncleHashes(struct evbuffer *evb, bin_t pos, bin_t munro, bool isretransmit)
210 {
211     binvector bv;
212     if (isretransmit) {
213         // Select all uncles
214         while (pos!=munro) {
215             bin_t uncle = pos.sibling();
216             bv.push_back(uncle);
217             pos = pos.parent();
218         }
219     } else {
220         // Select only unsent uncles
221         // Ric: TODO check (remove data_out_cap??)
222         //      For the moment lets keep the old behaviour
223         while (pos!=munro && ((NOW&3)==3 || !pos.parent().contains(data_out_cap_)) &&
224                 ack_in_.is_empty(pos.parent())) {
225             //while (pos!=munro && ack_in_.is_empty(pos.parent()) ) {
226             bin_t uncle = pos.sibling();
227             bv.push_back(uncle);
228             pos = pos.parent();
229         }
230     }
231 
232     // PPSP -04: Send in descending layer order
233     binvector::reverse_iterator iter;
234     for (iter=bv.rbegin(); iter != bv.rend(); iter++) {
235         bin_t uncle = *iter;
236         evbuffer_add_8(evb, SWIFT_INTEGRITY);
237         evbuffer_add_chunkaddr(evb,uncle,hs_out_->chunk_addr_);
238         Sha1Hash h = hashtree()->hash(uncle);
239         if (h == Sha1Hash::ZERO) {
240             // TEMP SIGNPEAKTODO
241             fprintf(stderr,"SENDING ZERO HASH %s. PRESS\n", uncle.str().c_str());
242             fflush(stderr);
243         }
244         evbuffer_add_hash(evb,h);
245         dprintf("%s #%" PRIu32 " +hash %s\n",tintstr(),id_,uncle.str().c_str());
246         pos = pos.parent();
247     }
248 }
249 
250 
251 
252 
253 
ImposeHint()254 bin_t Channel::ImposeHint()
255 {
256     uint64_t twist = hs_in_->peer_channel_id_;  // got no hints, send something randomly
257 
258     twist &= hashtree()->peak(0).toUInt(); // FIXME may make it semi-seq here
259 
260     bin_t my_pick = binmap_t::find_complement(ack_in_, *(transfer()->ack_out()), twist);
261     my_pick.to_twisted(twist);
262     while (my_pick.base_length()>max(1,(int)cwnd_))
263         my_pick = my_pick.left();
264 
265     return my_pick.twisted(twist);
266 }
267 
268 
DequeueHint(bool * retransmitptr)269 bin_t Channel::DequeueHint(bool *retransmitptr)
270 {
271     bin_t send = bin_t::NONE;
272 
273     // Arno, 2012-01-23: Extra protection against channel loss, don't send DATA
274     if (last_recv_time_ < NOW-(3*TINT_SEC)) {
275         dprintf("%s #%" PRIu32 " dequeue hint aborted, long time no recv %s\n",tintstr(),id_, tintstr(last_recv_time_));
276         return bin_t::NONE;
277     }
278 
279     // Arno, 2012-07-27: Reenable Victor's retransmit, check for ACKs
280     *retransmitptr = false;
281     while (!data_out_tmo_.empty()) {
282         tintbin tb = data_out_tmo_.front();
283         data_out_tmo_.pop_front();
284         if (ack_in_.is_filled(tb.bin)) {
285             // chunk was acknowledged in meantime
286             continue;
287         } else {
288             send = tb.bin;
289             dprintf("%s #%" PRIu32 " dequeuing timed-out %s\n",tintstr(),id_, send.str().c_str());
290             *retransmitptr = true;
291             if (!send.is_base())
292                 dprintf("%s #%" PRIu32 " Error: retransmit, %s, if not a base bin!\n",tintstr(),id_, send.str().c_str());
293             break;
294         }
295     }
296 
297     if (ENABLE_SENDERSIZE_PUSH && send.is_none() && hint_in_.empty() && last_recv_time_>NOW-rtt_avg_-TINT_SEC) {
298         bin_t my_pick = ImposeHint(); // FIXME move to the loop
299         if (!my_pick.is_none()) {
300             hint_in_size_ += my_pick.base_offset();
301             hint_in_.push_back(my_pick);
302             dprintf("%s #%" PRIu32 " *hint %s\n",tintstr(),id_,my_pick.str().c_str());
303         }
304     }
305 
306     dprintf("%s #%" PRIu32 " dequeue size: %d\n",tintstr(),id_, (int)hint_in_size_ );
307 
308     while (!hint_in_.empty() && send.is_none()) {
309         bin_t hint = hint_in_.front().bin;
310         dprintf("%s #%" PRIu32 " dequeuing cand %s\n",tintstr(),id_, hint.str().c_str());
311 
312         tint time = hint_in_.front().time;
313         hint_in_size_ -= hint_in_.front().bin.base_length();
314         hint_in_.pop_front();
315 
316         if (time < min(NOW-TINT_SEC*3/2, NOW-(rtt_avg_<<2))) {
317             dprintf("%s #%" PRIu32 " Don't serve: hint %s is too old\n",tintstr(),id_, send.str().c_str());
318             continue;
319         }
320 
321         while (!hint.is_base()) { // FIXME optimize; possible attack
322             hint_in_.push_front(tintbin(time,hint.right()));
323             hint_in_size_ += hint_in_.front().bin.base_length();
324             hint = hint.left();
325         }
326 
327         if (!ack_in_.is_filled(hint))
328             send = hint;
329         else
330             dprintf("%s #%" PRIu32 " hint %s has already been acknowledged\n",tintstr(),id_,hint.str().c_str());
331     }
332 
333     dprintf("%s #%" PRIu32 " dequeued %s [%" PRIu64 "]\n",tintstr(),id_,send.str().c_str(),hint_in_size_);
334     return send;
335 }
336 
337 
AddHandshake(struct evbuffer * evb)338 void Channel::AddHandshake(struct evbuffer *evb)
339 {
340     // If peer not responding, try legacy swift protocol
341 #if ENABLE_FALLBACK_TO_LEGACY_PROTO == 1
342     if (sent_since_recv_ >= 3 && last_recv_time_ == 0)
343         hs_out_->ResetToLegacy();
344 #endif
345 
346     int encoded = -1;
347     if (hs_out_->version_ == VER_SWIFT_LEGACY) {
348         //dprintf("%s #%" PRIu32 " +hs swift legacy\n",tintstr(),id_ );
349         if (hs_in_ == NULL) { // initiating
350             evbuffer_add_8(evb, SWIFT_INTEGRITY);
351             evbuffer_add_32be(evb, bin_toUInt32(bin_t::ALL));
352             evbuffer_add_hash(evb, transfer()->swarm_id().roothash());
353             dprintf("%s #%" PRIu32 " +hash ALL %s\n",
354                     tintstr(),id_,transfer()->swarm_id().hex().c_str());
355         }
356         evbuffer_add_8(evb, SWIFT_HANDSHAKE);
357 
358         if (send_control_==CLOSE_CONTROL) {
359             encoded = 0;
360         } else
361             encoded = EncodeID(id_);
362         evbuffer_add_32be(evb, encoded);
363 
364         dprintf("%s #%" PRIu32 " +hs %x swift\n",tintstr(),id_,encoded);
365     } else { // IETF PPSP compliant
366         //dprintf("%s #%" PRIu32 " +hs ppsp\n",tintstr(),id_ );
367         evbuffer_add_8(evb, SWIFT_HANDSHAKE);
368         if (send_control_==CLOSE_CONTROL) {
369             encoded = 0;
370         } else
371             encoded = EncodeID(id_);
372         evbuffer_add_32be(evb, encoded);
373 
374         // Send protocol options
375         std::ostringstream cross;
376         if (send_control_ !=CLOSE_CONTROL) {
377             evbuffer_add_8(evb, POPT_VERSION);
378             evbuffer_add_8(evb, hs_out_->version_);
379             cross << "v" << hs_out_->version_ << " ";
380             evbuffer_add_8(evb, POPT_MIN_VERSION);
381             evbuffer_add_8(evb, hs_out_->min_version_);
382             cross << "nv" << hs_out_->version_ << " ";
383 
384             if (hs_in_ == NULL) { // initiating, send swarm ID
385                 evbuffer_add_8(evb, POPT_SWARMID);
386                 if (transfer()->ttype() == FILE_TRANSFER) {
387                     evbuffer_add_16be(evb, Sha1Hash::SIZE);
388                     evbuffer_add_hash(evb, transfer()->swarm_id().roothash());
389                 } else {
390                     SwarmPubKey spubkey = transfer()->swarm_id().spubkey();
391                     evbuffer_add_16be(evb, spubkey.length());
392                     evbuffer_add(evb,spubkey.bits(),spubkey.length());
393                 }
394                 cross << "sid " << transfer()->swarm_id().hex() << " ";
395             }
396             evbuffer_add_8(evb, POPT_CONT_INT_PROT);
397             evbuffer_add_8(evb, hs_out_->cont_int_prot_);
398             cross << "cipm " << hs_out_->cont_int_prot_ << " ";
399             if (hs_out_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE) {
400                 evbuffer_add_8(evb, POPT_MERKLE_HASH_FUNC);
401                 evbuffer_add_8(evb, hs_out_->merkle_func_);
402                 cross << "mhf " << hs_out_->merkle_func_ << " ";
403             }
404             if (transfer()->ttype() == LIVE_TRANSFER && hs_out_->cont_int_prot_ != POPT_CONT_INT_PROT_NONE) {
405                 evbuffer_add_8(evb, POPT_LIVE_SIG_ALG);
406                 evbuffer_add_8(evb, hs_out_->live_sig_alg_);
407                 cross << "lsa " << hs_out_->live_sig_alg_ << " ";
408             }
409             evbuffer_add_8(evb, POPT_CHUNK_ADDR);
410             evbuffer_add_8(evb, hs_out_->chunk_addr_);
411             cross << "cam " << hs_out_->chunk_addr_ << " ";
412             if (transfer()->ttype() == LIVE_TRANSFER) {
413                 evbuffer_add_8(evb, POPT_LIVE_DISC_WND);
414                 // For POPT_CHUNK_ADDR_CHUNK32, saves all chunks
415                 // PPSPTODO forget
416                 if (hs_out_->chunk_addr_ == POPT_CHUNK_ADDR_BIN32 || hs_out_->chunk_addr_ == POPT_CHUNK_ADDR_CHUNK32)
417                     evbuffer_add_32be(evb, (uint32_t)hs_out_->live_disc_wnd_);
418                 else
419                     evbuffer_add_64be(evb, hs_out_->live_disc_wnd_);
420                 cross << "ldw " << std::hex << hs_out_->live_disc_wnd_ << std::dec << " ";
421             }
422         }
423         dprintf("%s #%" PRIu32 " +hs %x ppsp %s\n",tintstr(),id_,encoded, cross.str().c_str());
424 
425         evbuffer_add_8(evb, POPT_END);
426     }
427 
428     have_out_.clear();
429 }
430 
431 
Send()432 void Channel::Send()
433 {
434 
435     dprintf("%s #%" PRIu32 " Send called \n",tintstr(),id_);
436 
437     // Ledbat log
438     // Time - PingPong - SlowStart - CC - KeepAlive - Close - CCwindow - Loss
439     if (id_==1)
440         switch (send_control_) {
441         case KEEP_ALIVE_CONTROL:
442             lprintf("%lu \t %d \t %d \t %d \t %li \t %d \t %d \t %d \t %li \t %li\n", NOW-open_time_, 0, 0, 0, NOW-last_send_time_,
443                     0, 0, 0, dip_avg_, hint_out_size_);
444             break;
445         case PING_PONG_CONTROL:
446             lprintf("%lu \t %li \t %d \t %d \t %d \t %d \t %d \t %d \n", NOW-open_time_, NOW-last_send_time_, 0, 0, 0, 0, 0, 0);
447             break;
448         case SLOW_START_CONTROL:
449             lprintf("%lu \t %d \t %li \t %d \t %d \t %d \t %d \t %d \n", NOW-open_time_, 0, NOW-last_send_time_, 0, 0, 0, 0, 0);
450             break;
451         case AIMD_CONTROL:
452             lprintf("%lu \t %d \t %d \t %li \t %d \t %d \t %d \t %.2f \n", NOW-open_time_, 0, 0, NOW-last_send_time_, 0, 0, 0,
453                     cwnd_);
454             break;
455         case LEDBAT_CONTROL:
456             lprintf("%lu \t %d \t %d \t %li \t %d \t %d \t %d \t %.2f \t %li\n", NOW-open_time_, 0, 0, NOW-last_send_time_, 0, 0, 0,
457                     cwnd_, hint_in_size_);
458             break;
459         case CLOSE_CONTROL:
460             lprintf("%lu \t %d \t %d \t %d \t %d \t %li \t %d \t %d \n", NOW-open_time_, 0, 0, 0, 0, NOW-last_send_time_, 0, 0);
461             break;
462         default:
463             assert(false);
464             break;
465         }
466 
467     struct evbuffer *evb = evbuffer_new();
468     uint32_t pcid = 0;
469     if (hs_in_ != NULL)
470         pcid =  hs_in_->peer_channel_id_;
471 
472     evbuffer_add_32be(evb,pcid);
473     bin_t data = bin_t::NONE;
474     int evbnonadplen = 0;
475     if (send_control_==CLOSE_CONTROL) // Arno: send explicit close
476         AddHandshake(evb);
477     else {
478         if (is_established()) {
479             // FIXME: seeder check
480             AddHave(evb);
481             AddAck(evb);
482             //LIVE
483             if (hashtree() == NULL || !hashtree()->is_complete()) {
484                 AddHint(evb);
485                 /* Gertjan fix: 7aeea65f3efbb9013f601b22a57ee4a423f1a94d
486                 "Only call Reschedule for 'reverse PEX' if the channel is in keep-alive mode"
487                  */
488                 AddPexReq(evb);
489                 if (ENABLE_CANCEL)
490                     AddCancel(evb);
491             }
492             AddPex(evb);
493             TimeoutDataOut();
494             data = AddData(evb);
495         } else  {
496             AddHandshake(evb);
497             AddHave(evb); // Arno, 2011-10-28: from AddHandShake. Why double?
498             AddHave(evb);
499             AddAck(evb);
500         }
501     }
502     lastsendwaskeepalive_ = (evbuffer_get_length(evb) == 4);
503 
504     if (evbuffer_get_length(evb)==4) {// only the channel id; bare keep-alive
505         data = bin_t::ALL;
506     }
507 
508     dprintf("%s #%" PRIu32 " sent %ib %s:%x\n",
509             tintstr(),id_,(int)evbuffer_get_length(evb),peer().str().c_str(),
510             pcid);
511     last_send_time_ = NOW;
512 
513     int r = SendTo(socket_,peer(),evb);
514     if (r==-1)
515         print_error("swift can't send datagram");
516     else {
517         raw_bytes_up_ += r;
518         sent_since_recv_++;
519         dgrams_sent_++;
520     }
521     evbuffer_free(evb);
522     Reschedule();
523 }
524 
AddHint(struct evbuffer * evb)525 void Channel::AddHint(struct evbuffer *evb)
526 {
527 
528     // LIVE source
529     if (transfer()->picker() == NULL)
530         return;
531 
532     // RATELIMIT
533     // Policy is to not send hints when we are above speed limit
534     if (transfer()->GetCurrentSpeed(DDIR_DOWNLOAD) > transfer()->GetMaxSpeed(DDIR_DOWNLOAD)) {
535         if (DEBUGTRAFFIC)
536             fprintf(stderr,"hint: forbidden#");
537         return;
538     }
539 
540     FileTransfer * ft = (FileTransfer *)transfer();
541 
542     if (ft->hashtree()->is_complete())
543         return;
544 
545     // 1. Calc max of what we are allowed to request, uncongested bandwidth wise
546     tint plan_for = max(TINT_SEC*HINT_TIME,rtt_avg_<<2);
547     tint timed_out = NOW - plan_for*2;
548 
549     std::deque<bin_t> tbc;
550     while (!hint_out_.empty() && hint_out_.front().time < timed_out) {
551         bin_t hint = hint_out_.front().bin;
552         hint_out_size_ -= hint.base_length();
553         hint_out_.pop_front();
554         // Ric: keep track of what we want to remove
555         tbc.push_back(hint);
556         dprintf("%s #%" PRIu32 " remove hint %s\n",tintstr(),id_,hint.str().c_str());
557 
558     }
559 
560     // Ric: calculate the dip over the last period
561     //      use same approach as LEDBAT!
562     // We may apply a filter over the elements.. as suggested in the rfc
563     ttqueue::iterator it = dip_list_.begin();
564     int32_t count = 0;
565     int total = 0;
566     tint dip = 0;
567     tint timeout = NOW - plan_for/2;
568     // use the dip received during the last period, or the dip_avg_
569     while (it != dip_list_.end() && (it->second > timeout || count < 4)) {
570         total += it->first;
571         count++;
572         it++;
573     }
574     // clean up
575     while (dip_list_.size() > 4 && dip_list_.back().second < timeout) {
576         dip_list_.pop_back();
577     }
578     if (dip_list_.size() < 4)
579         dip = dip_avg_;
580     else
581         dip = total/count;
582 
583     int first_plan_pck = (tint)1;
584     if (dip != 0)
585       first_plan_pck = max((tint)1, plan_for / dip);
586 
587     // Riccardo, 2012-04-04: Actually allowed is max minus what we already asked for
588     int queue_allowed_hints = max(0,first_plan_pck-(int)hint_out_size_);
589 
590 
591     // RATELIMIT
592     // 2. Calc max of what is allowed by the rate limiter
593     int rate_allowed_hints = INT_MAX;
594     uint64_t rough_global_hint_out_size = 0; // rough estimate, as hint_out_ clean up is not done for all channels
595     bool count_hints = false;
596     if (transfer()->GetMaxSpeed(DDIR_DOWNLOAD) < DBL_MAX) {
597         channels_t::iterator iter;
598         for (iter=transfer()->GetChannels()->begin(); iter!=transfer()->GetChannels()->end(); iter++) {
599             Channel *c = *iter;
600             if (c != NULL)
601                 rough_global_hint_out_size += c->hint_out_size_;
602         }
603 
604         // Policy: this channel is allowed to hint at the limit - global_hinted_at
605         // Handle MaxSpeed = unlimited
606         double rate_hints_limit_float = HINT_TIME*transfer()->GetMaxSpeed(DDIR_DOWNLOAD)/((double)transfer()->chunk_size());
607 
608         // Ric: slow down if we just started the connection
609         double slowStart = (double)LONG_MAX;
610         tint running = now_t::now-start;
611         // It takes ~3 sec to get a stable DL speed estimation
612         if (running<TINT_SEC*3) {// && hint_out_size_>1) {
613             count_hints = true;
614             slowStart = rate_hints_limit_float*running/TINT_SEC;
615             if (slowStart>transfer()->GetSlowStartHints())
616                 slowStart = slowStart-transfer()->GetSlowStartHints();
617             else
618                 slowStart = 0;
619             if (DEBUGTRAFFIC)
620                 fprintf(stderr, "slowStart: %lf [%" PRIu32 "]\n", slowStart, transfer()->GetSlowStartHints());
621         }
622 
623         int rate_hints_limit = (int)min(slowStart,rate_hints_limit_float);
624 
625         // Actually allowed is max minus what we already asked for, globally (=all channels)
626         rate_allowed_hints = max(0,rate_hints_limit-(int)rough_global_hint_out_size);
627 
628         if (DEBUGTRAFFIC)
629             fprintf(stderr,"hint c%" PRIu32 ": rate_hints_limit %d rallow %d globout %" PRIu64 "\n", id(), rate_hints_limit,
630                     rate_allowed_hints, rough_global_hint_out_size);
631 
632     }
633     // Ric: test TODO
634     //if (DEBUGTRAFFIC)
635         //fprintf(stderr,"hint c%" PRIu32 ": %lf want %d qallow %d rallow %d chanout %" PRIu64 " globout %" PRIu64 "\n", id(),
636         //        transfer()->GetCurrentSpeed(DDIR_DOWNLOAD), first_plan_pck, queue_allowed_hints, rate_allowed_hints, hint_out_size_,
637         //        rough_global_hint_out_size);
638     dprintf("%s #%" PRIu32 "hint c%" PRIu32 ": %lf want %d qallow %d rallow %d chanout %" PRIu64 " globout %" PRIu64 "\n",tintstr(),id_, id(),
639                     transfer()->GetCurrentSpeed(DDIR_DOWNLOAD), first_plan_pck, queue_allowed_hints, rate_allowed_hints, hint_out_size_,
640                     rough_global_hint_out_size);
641     // 3. Take the smallest allowance from rate and queue limit
642     // Ric: test: TODO remove
643     //uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,first_plan_pck);
644     uint64_t plan_pck = (uint64_t)min(rate_allowed_hints,queue_allowed_hints);
645 
646 
647     // 4. Ask allowance in blocks of chunks to get pipelining going from serving peer.
648     // Arno, 2012-10-30: not HINT_GRANULARITY for LIVE
649     //if (hint_out_size_ == 0 || plan_pck >= HINT_GRANULARITY || transfer()->ttype() == LIVE_TRANSFER)
650     if (plan_pck >= HINT_GRANULARITY || transfer()->ttype() == LIVE_TRANSFER) {
651         bin_t hint = bin_t::NONE;
652         if (transfer()->ttype() == LIVE_TRANSFER)
653             hint = transfer()->picker()->Pick(ack_in_,plan_pck,NOW+plan_for*2,id_);
654         else {
655             //fprintf(stderr, "want %d\tplan %d\n", want, plan_pck);
656             hint = DequeueHintOut(plan_pck);
657 
658             if (hint.is_none()) {
659                 bin_t res = transfer()->picker()->Pick(ack_in_,plan_pck,NOW+plan_for*2,id_);
660                 if (!res.is_none()) {
661                     hint_queue_out_.push_back(tintbin(NOW,res));
662                     hint_queue_out_size_ += res.base_length();
663                     hint = DequeueHintOut(plan_pck);
664                 }
665             }
666         }
667 
668         if (!hint.is_none()) {
669             if (DEBUGTRAFFIC) {
670                 fprintf(stderr,"hint c%d: ask %s\n", id(), hint.str().c_str());
671             }
672             evbuffer_add_8(evb, SWIFT_REQUEST);
673             evbuffer_add_chunkaddr(evb,hint,hs_out_->chunk_addr_);
674             dprintf("%s #%" PRIu32 " +hint %s [%" PRIi64 "]\n",tintstr(),id_,hint.str().c_str(),hint_out_size_);
675             dprintf("%s #%" PRIu32 " +hint base %s width %d\n",tintstr(),id_,hint.base_left().str().c_str(),
676                     (int)hint.base_length());
677             //fprintf(stderr,"send c%d: HINTLEN %i\n", id(), hint.base_length());
678             //fprintf(stderr,"HL %i ", hint.base_length());
679 
680 #if ENABLE_CANCEL == 1
681             // Ric: final cancel the hints that have been removed
682             while (!tbc.empty()) {
683                 bin_t c = tbc.front();
684                 if (!c.contains(hint) && !hint.contains(c))
685                     cancel_out_.push_back(c);
686                 else if (c.contains(hint))
687                     while (c.contains(hint) && c!=hint) {
688                         if (c>hint)
689                             c.to_left();
690                         else
691                             c.to_right();
692                         cancel_out_.push_back(c.sibling());
693                     }
694                 else if (c == hint)
695                     break;
696                 tbc.pop_front();
697             }
698 #endif
699             hint_out_.push_back(hint);
700             hint_out_size_ += hint.base_length();
701 
702             // Ric: keep track of the outstanding hints
703             if (count_hints)
704                 transfer()->SetSlowStartHints(hint.base_length());
705 
706             // RTTFIX
707             if (rtt_hint_tintbin_.bin == bin_t::NONE) {
708                 rtt_hint_tintbin_.bin = hint.base_left();
709                 rtt_hint_tintbin_.time = NOW;
710             }
711 
712             return;
713         } else
714             dprintf("%s #%" PRIu32 " Xhint\n",tintstr(),id_);
715     }
716 #if ENABLE_CANCEL == 1
717     // add the temporary cancel bin to the actual cancel queue
718     while (!tbc.empty()) {
719         cancel_out_.push_back(tbc.front());
720         tbc.pop_front();
721     }
722 #endif
723 }
724 
ChunkAddrSize(popt_chunk_addr_t ca)725 static int ChunkAddrSize(popt_chunk_addr_t ca)
726 {
727     switch (ca) {
728     case POPT_CHUNK_ADDR_BIN32:
729         return 4;
730     case POPT_CHUNK_ADDR_BYTE64:
731         return 2*8;
732     case POPT_CHUNK_ADDR_CHUNK32:
733         return 2*4;
734     case POPT_CHUNK_ADDR_BIN64:
735         return 8;
736     case POPT_CHUNK_ADDR_CHUNK64:
737         return 2*8;
738     }
739     return 0;
740 }
741 
AddCancel(struct evbuffer * evb)742 void Channel::AddCancel(struct evbuffer *evb)
743 {
744 
745     // SIGNPEAKTODO
746     return;
747 
748 
749     // Arno, 2013-01-15: take into account chunk addressing scheme
750     while (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb) >= 1+ChunkAddrSize(hs_out_->chunk_addr_)
751             && !cancel_out_.empty()) {
752         bin_t cancel = cancel_out_.front();
753         cancel_out_.pop_front();
754         evbuffer_add_8(evb, SWIFT_CANCEL);
755         evbuffer_add_chunkaddr(evb,cancel,hs_out_->chunk_addr_);
756         dprintf("%s #%" PRIu32 " +cancel %s\n",
757                 tintstr(),id_,cancel.str().c_str());
758     }
759 }
760 
AddData(struct evbuffer * evb)761 bin_t Channel::AddData(struct evbuffer *evb)
762 {
763     // RATELIMIT
764     if (transfer()->GetCurrentSpeed(DDIR_UPLOAD) > transfer()->GetMaxSpeed(DDIR_UPLOAD)) {
765         transfer()->OnSendNoData();
766         return bin_t::NONE;
767     }
768     //LIVE
769     if (transfer()->ttype() == FILE_TRANSFER && !hashtree()->size()) // know nothing
770         return bin_t::NONE;
771 
772     // Arno: If we are serving content, keep activated
773     swift::Touch(transfer()->td());
774 
775     bin_t tosend = bin_t::NONE;
776     bool isretransmit = false;
777     tint luft = send_interval_>>4; // may wake up a bit earlier
778 
779     if ((data_out_size_<cwnd_ || cwnd_>0) && last_data_out_time_+send_interval_-reschedule_delay_<=NOW+luft) {
780         tosend = DequeueHint(&isretransmit);
781         if (tosend.is_none()) {
782             dprintf("%s #%" PRIu32 " sendctrl no idea what data to send\n",tintstr(),id_);
783             if (send_control_!=KEEP_ALIVE_CONTROL && send_control_!=CLOSE_CONTROL) {
784                 lprintf("\t\t==== Switch to Keep Alive Control (nothing to send) ==== \n");
785                 keepalivereason_ = NOTHING_TO_SEND;
786                 SwitchSendControl(KEEP_ALIVE_CONTROL);
787             }
788         }
789     } else
790         dprintf("%s #%" PRIu32 " sendctrl wait cwnd %f data_out %i next %s\n",
791                 tintstr(),id_,cwnd_,data_out_size_,tintstr(last_data_out_time_+send_interval_));
792 
793     // Add required hashes. Also for initial peaks and munros
794     // Note this is called always, not just when there are requests pending.
795     AddRequiredHashes(evb,tosend,isretransmit);
796 
797     if (tosend.is_none()) {// && (last_data_out_time_>NOW-TINT_SEC || data_out_.empty()))
798         transfer()->OnSendNoData();
799         return bin_t::NONE; // once in a while, empty data is sent just to check rtt FIXED
800     }
801 
802     if (!ack_in_.is_empty()) // TODO: cwnd_>1
803         data_out_cap_ = tosend;
804 
805     // Send hashes in separate datagram if first would get too big
806     SendIfTooBig(evb);
807 
808     // Add chunk
809     evbuffer_add_8(evb, SWIFT_DATA);
810     evbuffer_add_chunkaddr(evb,tosend,hs_out_->chunk_addr_);
811     // PPSPTODO LEDBAT current system time 64-bit
812     if (hs_in_ != NULL && hs_in_->version_ == VER_PPSPP_v1) {
813         // NOTE: Time updates NOW, so customary behavior where NOW is not
814         // updated during the handling of a message (just at start) is no longer
815         // there. Not sure if this matters.
816         evbuffer_add_64be(evb, Time());
817     }
818 
819     struct evbuffer_iovec vec;
820     if (evbuffer_reserve_space(evb, transfer()->chunk_size(), &vec, 1) < 0) {
821         print_error("error on evbuffer_reserve_space");
822         return bin_t::NONE;
823     }
824 
825     if (DEBUGTRAFFIC)
826         dprintf("%s #%" PRIu32 " ?data reading swarm %llu\n",tintstr(),id_, tosend.base_offset()*transfer()->chunk_size());
827 
828     ssize_t r = transfer()->GetStorage()->Read((char *)vec.iov_base,
829                 transfer()->chunk_size(),tosend.base_offset()*transfer()->chunk_size());
830     // TODO: corrupted data, retries, caching
831     if (r <= 0) {
832         print_error("error on reading");
833 
834         dprintf("%s #%" PRIu32 " !data %s\n",tintstr(),id_,tosend.str().c_str());
835         vec.iov_len = 0;
836         evbuffer_commit_space(evb, &vec, 1);
837         return bin_t::NONE;
838     }
839     // assert(dgram.space()>=r+4+1);
840     vec.iov_len = r;
841     if (evbuffer_commit_space(evb, &vec, 1) < 0) {
842         print_error("error on evbuffer_commit_space");
843         return bin_t::NONE;
844     }
845 
846     last_data_out_time_ = NOW;
847     data_out_.push_back(tosend);
848     data_out_size_++;
849     bytes_up_ += r;
850     global_bytes_up += r;
851 
852     dprintf("%s #%" PRIu32 " +data %s\n",tintstr(),id_,tosend.str().c_str());
853 
854     // RATELIMIT
855     // ARNOSMPTODO: count overhead bytes too? Move to Send() then.
856     transfer_->OnSendData(transfer()->chunk_size());
857 
858     return tosend;
859 }
860 
861 
SendIfTooBig(struct evbuffer * evb)862 void Channel::SendIfTooBig(struct evbuffer *evb)
863 {
864     // Arno, 2011-11-03: May happen when first data packet is sent to empty
865     // leech, then peak + uncle hashes may be so big that they don't fit in eth
866     // frame with DATA. Send 2 datagrams then, one with peaks so they have
867     // a better chance of arriving. Optimistic violation of atomic datagram
868     // principle.
869     // Arno, 2013-05-14: Don't work if this is first msg, as peer_channel_id
870     // will be unknown. Then just continue adding to the first datagram and
871     // hope for the best.
872     if (is_established() && transfer()->chunk_size() == SWIFT_DEFAULT_CHUNK_SIZE
873             && evbuffer_get_length(evb) > SWIFT_MAX_NONDATA_DGRAM_SIZE) {
874 
875         dprintf("%s #%" PRIu32 " fsent %ib %s:%x\n",
876                 tintstr(),id_,(int)evbuffer_get_length(evb),peer().str().c_str(),
877                 hs_in_->peer_channel_id_);
878         int ret = Channel::SendTo(socket_,peer(),evb); // kind of fragmentation
879         if (ret > 0)
880             raw_bytes_up_ += ret;
881         evbuffer_add_32be(evb, hs_in_->peer_channel_id_);
882     }
883 }
884 
885 
AddAck(struct evbuffer * evb)886 void Channel::AddAck(struct evbuffer *evb)
887 {
888     if (data_in_==tintbin())
889         //if (data_in_.bin==bin64_t::NONE)
890         return;
891     // sometimes, we send a HAVE (e.g. in case the peer did repetitive send)
892     evbuffer_add_8(evb, data_in_.time==TINT_NEVER?SWIFT_HAVE:SWIFT_ACK);
893     evbuffer_add_chunkaddr(evb,data_in_.bin,hs_out_->chunk_addr_);
894     // PPSPTODO LEDBAT one-way delay
895     if (data_in_.time!=TINT_NEVER)
896         evbuffer_add_64be(evb, data_in_.time);
897 
898     if (DEBUGTRAFFIC)
899         fprintf(stderr,"send c%d: ACK %i\n", id(), bin_toUInt32(data_in_.bin));
900 
901     have_out_.set(data_in_.bin);
902     dprintf("%s #%" PRIu32 " +ack %s %" PRIi64 "\n",
903             tintstr(),id_,data_in_.bin.str().c_str(),data_in_.time);
904     if (data_in_.bin.layer()>2)
905         data_in_dbl_ = data_in_.bin;
906 
907 #if ENABLE_CANCEL == 1
908     // Ric: check that we are not sending a cancel msg for data_in_
909     std::deque<bin_t>::iterator it;
910     bin_t b = data_in_.bin;
911     for (it=cancel_out_.begin(); it!=cancel_out_.end(); it++) {
912         bin_t c = *it;
913         if (c == b) {
914             cancel_out_.erase(it);
915             break;
916         }
917         // b is always a single chunk :-)
918         else if (c.contains(b)) {
919             while (c.contains(b) && c!=b) {
920                 if (c>b) {
921                     cancel_out_.insert(it+1,c.right());
922                     c.to_left();
923                 } else {
924                     cancel_out_.insert(it+1,c.left());
925                     c.to_right();
926                 }
927             }
928             assert(c==b);
929             cancel_out_.erase(it);
930         }
931     }
932 #endif
933     data_in_ = tintbin();
934     //data_in_ = tintbin(NOW,bin64_t::NONE);
935 }
936 
937 
AddHave(struct evbuffer * evb)938 void Channel::AddHave(struct evbuffer *evb)
939 {
940     if (!data_in_dbl_.is_none()) { // TODO: do redundancy better
941         evbuffer_add_8(evb, SWIFT_HAVE);
942         evbuffer_add_chunkaddr(evb,data_in_dbl_,hs_out_->chunk_addr_);
943         data_in_dbl_=bin_t::NONE;
944     }
945     if (DEBUGTRAFFIC)
946         fprintf(stderr,"send c%d: HAVE ",id());
947 
948     // ZEROSTATE
949     if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState()) {
950         if (is_established())
951             return;
952 
953         // Say we have peaks
954         for (int i=0; i<hashtree()->peak_count(); i++) {
955             bin_t peak = hashtree()->peak(i);
956             evbuffer_add_8(evb, SWIFT_HAVE);
957             evbuffer_add_chunkaddr(evb,peak,hs_out_->chunk_addr_);
958             dprintf("%s #%" PRIu32 " +have %s\n",tintstr(),id_,peak.str().c_str());
959         }
960         return;
961     }
962 
963     // SIGNPEAK
964     binmap_t *transfer_ack_out_ptr = transfer()->ack_out();
965     if (hs_in_ != NULL && hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
966         // Arno, 2013-02-26: LIVE SIGNPEAK Cannot send HAVEs not covered by
967         // signed peak when source. Non-source peers will consequently only
968         // see and receive chunks covered by signed peaks.
969         LiveTransfer *lt = (LiveTransfer *)transfer();
970         if (lt->am_source())
971             transfer_ack_out_ptr = lt->ack_out_signed();
972     }
973     for (int count=0; count<4; count++) {
974         bin_t ack = binmap_t::find_complement(have_out_, *(transfer_ack_out_ptr), 0); // FIXME: do rotating queue
975         if (ack.is_none())
976             break;
977         ack = transfer_ack_out_ptr->cover(ack);
978         have_out_.set(ack);
979         evbuffer_add_8(evb, SWIFT_HAVE);
980         evbuffer_add_chunkaddr(evb,ack,hs_out_->chunk_addr_);
981 
982         if (DEBUGTRAFFIC)
983             fprintf(stderr," %i", bin_toUInt32(ack));
984 
985         dprintf("%s #%" PRIu32 " +have %s\n",tintstr(),id_,ack.str().c_str());
986     }
987     if (DEBUGTRAFFIC)
988         fprintf(stderr,"\n");
989 }
990 
991 
Recv(struct evbuffer * evb)992 void    Channel::Recv(struct evbuffer *evb)
993 {
994     dprintf("%s #%" PRIu32 " recvd %ib\n",tintstr(),id_,(int)evbuffer_get_length(evb)+4);
995     dgrams_rcvd_++;
996 
997     if (!transfer()->IsOperational()) {
998         dprintf("%s #%" PRIu32 " recvd on broken transfer %d \n",tintstr(),id_, transfer()->td());
999         CloseOnError();
1000         return;
1001     }
1002 
1003     lastrecvwaskeepalive_ = (evbuffer_get_length(evb) == 0);
1004     if (lastrecvwaskeepalive_)
1005         // Update speed measurements such that they decrease when DL stops
1006         transfer()->OnRecvData(0);
1007 
1008     if (last_send_time_ && rtt_avg_==TINT_SEC && dev_avg_==0) {
1009         rtt_avg_ = NOW - last_send_time_;
1010         dev_avg_ = rtt_avg_;
1011         dip_avg_ = rtt_avg_;
1012         dprintf("%s #%" PRIu32 " sendctrl rtt init %" PRIi64 "\n",tintstr(),id_,rtt_avg_);
1013         //fprintf(stderr,"%s #%" PRIu32 " sendctrl rtt init %" PRIi64 "\n",tintstr(),id_,rtt_avg_);
1014     }
1015 
1016     bin_t data = evbuffer_get_length(evb) ? bin_t::NONE : bin_t::ALL;
1017 
1018     if (DEBUGTRAFFIC)
1019         fprintf(stderr,"recv c%" PRIu32 ": size " PRISIZET "\n", id(), evbuffer_get_length(evb));
1020 
1021     Handshake *hishs = NULL;
1022     if (hs_in_ == NULL) { // first reply from client
1023         if (hs_out_->version_ == VER_SWIFT_LEGACY) // I sent PPSPP_v1 HS, did not respond, now trying legacy
1024             hishs = StaticOnHandshake(peer_,id(),true,VER_SWIFT_LEGACY,evb);
1025         else
1026             hishs = StaticOnHandshake(peer_,id(),false,VER_PPSPP_v1,evb);
1027         if (hishs == NULL)
1028             return;
1029         else
1030             OnHandshake(hishs);
1031     }
1032     while (evbuffer_get_length(evb) && send_control_!=CLOSE_CONTROL) {
1033         uint8_t type = evbuffer_remove_8(evb);
1034 
1035         if (DEBUGTRAFFIC)
1036             fprintf(stderr,"GOT %d\n", type);
1037 
1038         switch (type) {
1039         case SWIFT_HANDSHAKE: // explicit close
1040             hishs = StaticOnHandshake(peer_,id(),true,hs_in_->version_,evb);
1041             if (hishs == NULL)
1042                 return;
1043             OnHandshake(hishs);
1044             break;
1045         case SWIFT_DATA:
1046             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1047                 OnDataZeroState(evb);
1048             else
1049                 data=OnData(evb);
1050             break;
1051         case SWIFT_HAVE:
1052             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1053                 OnHaveZeroState(evb);
1054             else
1055                 OnHave(evb);
1056             break;
1057         case SWIFT_ACK:
1058             OnAck(evb);
1059             break;
1060         case SWIFT_INTEGRITY:
1061             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1062                 OnHashZeroState(evb);
1063             else
1064                 OnHash(evb);
1065             break;
1066         case SWIFT_SIGNED_INTEGRITY: // PPSP
1067             OnSignedHash(evb);
1068             break;
1069         case SWIFT_REQUEST:
1070             OnHint(evb);
1071             break;
1072         case SWIFT_CANCEL: // PPSP
1073             OnCancel(evb);
1074             break;
1075         case SWIFT_PEX_RESv4:
1076             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1077                 OnPexAddZeroState(evb,AF_INET);
1078             else
1079                 OnPexAdd(evb,AF_INET);
1080             break;
1081         case SWIFT_PEX_RESv6: // PPSP
1082             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1083                 OnPexAddZeroState(evb,AF_INET6);
1084             else
1085                 OnPexAdd(evb,AF_INET6);
1086             break;
1087         case SWIFT_PEX_REScert: // PPSP
1088             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1089                 OnPexAddCertZeroState(evb);
1090             else
1091                 OnPexAddCert(evb);
1092             break;
1093         case SWIFT_PEX_REQ:
1094             if (transfer()->ttype() == FILE_TRANSFER && ((FileTransfer *)transfer())->IsZeroState())
1095                 OnPexReqZeroState(evb);
1096             else
1097                 OnPexReq();
1098             break;
1099         case SWIFT_CHOKE: // PPSP
1100             OnChoke(evb);
1101             break;
1102         case SWIFT_UNCHOKE: // PPSP
1103             OnUnchoke(evb);
1104             break;
1105         default:
1106             dprintf("%s #%" PRIu32 " ?msg id unknown %i\n",tintstr(),id_,(int)type);
1107             return;
1108         }
1109     }
1110     if (DEBUGTRAFFIC) {
1111         fprintf(stderr,"\n");
1112     }
1113 
1114     last_recv_time_ = NOW;
1115     sent_since_recv_ = 0;
1116 
1117     // Arno: see if transfer still in working order
1118     transfer()->UpdateOperational();
1119     if (!transfer()->IsOperational()) {
1120         dprintf("%s #%" PRIu32 " recvd broke transfer %d \n",tintstr(),id_, transfer()->td());
1121         CloseOnError();
1122         return;
1123     }
1124 
1125     Reschedule();
1126 }
1127 
1128 
CloseOnError()1129 void Channel::CloseOnError()
1130 {
1131     Close(CLOSE_SEND_IF_ESTABLISHED);
1132 }
1133 
1134 
1135 /*
1136  * Arno: FAXME: HASH+DATA should be handled as a transaction: only when the
1137  * hashes check out should they be stored in the hashtree, otherwise revert.
1138  */
OnHash(struct evbuffer * evb)1139 void Channel::OnHash(struct evbuffer *evb)
1140 {
1141     if (hs_in_->cont_int_prot_ != POPT_CONT_INT_PROT_MERKLE
1142             && hs_in_->cont_int_prot_ != POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
1143         dprintf("%s #%" PRIu32 " ?hash but no integrity prot\n",tintstr(),id_);
1144         return;
1145     }
1146 
1147     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1148     if (bv.size() == 0 || bv.size() > 1) {
1149         // chunk spec for hash must be power-of-2 range, so must fit in single bin
1150         dprintf("%s #%" PRIu32 " ?hash bad chunk spec\n",tintstr(),id_);
1151         evbuffer_drain(evb, evbuffer_get_length(evb));
1152         Close(CLOSE_DO_NOT_SEND);
1153         return;
1154     }
1155     bin_t pos = bv.front();
1156     Sha1Hash hash = evbuffer_remove_hash(evb);
1157 
1158     dprintf("%s #%" PRIu32 " -hash %s\n",tintstr(),id_,pos.str().c_str());
1159     if (hashtree() != NULL && (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE
1160                                || hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)) {
1161         if (transfer()->ttype() == LIVE_TRANSFER) {
1162             // Don't accept hashes from others as source
1163             LiveTransfer *lt = (LiveTransfer *)transfer();
1164             if (lt->am_source())
1165                 return;
1166         }
1167         hashtree()->OfferHash(pos,hash);
1168     }
1169 
1170     //fprintf(stderr,"HASH %" PRIi64 " hex %s\n",pos.toUInt(), hash.hex().c_str() );
1171 }
1172 
1173 
CleanHintOut(bin_t pos)1174 void Channel::CleanHintOut(bin_t pos)
1175 {
1176     int hi = 0;
1177     while (hi<hint_out_.size() && !hint_out_[hi].bin.contains(pos))
1178         hi++;
1179     if (hi==hint_out_.size())
1180         return; // something not hinted or hinted in far past
1181 
1182     // Ric: TODO allow reordering of arriving pkts
1183     while (hi--) { // removing likely snubbed hints
1184         bin_t hint = hint_out_.front().bin;
1185         hint_out_size_ -= hint.base_length();
1186         hint_out_.pop_front();
1187         dprintf("%s #%" PRIu32 " Clean outstanding hint %s\n",tintstr(),id_,hint.str().c_str());
1188 #if ENABLE_CANCEL == 1
1189         // Ric: add to the cancel queue
1190         cancel_out_.push_back(hint);
1191 #endif
1192     }
1193     while (hint_out_.front().bin!=pos) {
1194         tintbin f = hint_out_.front();
1195 
1196         assert(f.bin.contains(pos));
1197 
1198         if (pos < f.bin) {
1199             f.bin.to_left();
1200         } else {
1201             f.bin.to_right();
1202         }
1203 
1204         hint_out_.front().bin = f.bin.sibling();
1205         hint_out_.push_front(f);
1206     }
1207 #if ENABLE_CANCEL == 1
1208     // Ric: add to the cancel queue
1209     cancel_out_.push_back(hint_out_.front().bin);
1210 #endif
1211     hint_out_size_ -= hint_out_.front().bin.base_length();
1212     hint_out_.pop_front();
1213 }
1214 
DequeueHintOut(uint64_t size)1215 bin_t Channel::DequeueHintOut(uint64_t size)
1216 {
1217 
1218     if (DEBUGTRAFFIC)
1219         fprintf(stderr, "%s #%" PRIu32 " Dequeue hint out size (%" PRIu64 ") [%" PRIu64 " are req] ",tintstr(),id_,
1220                 hint_queue_out_size_, size);
1221 
1222     // check for hints that have already been downloaded
1223     // in order to optimise the retrieval, some bins might be outdated
1224     while (hint_queue_out_.size() && !transfer()->ack_out()->is_empty(hint_queue_out_.front().bin)) {
1225         if (DEBUGTRAFFIC)
1226             dprintf("%s #%" PRIu32 " candidate hint :%s not empty!\n",tintstr(),id_, hint_queue_out_.front().bin.str().c_str());
1227         tintbin tb = hint_queue_out_.front();
1228         bin_t bin = tb.bin;
1229         tint time = tb.time;
1230         hint_queue_out_.pop_front();
1231         hint_queue_out_size_ -= bin.base_length();
1232 
1233         while (!bin.is_base() && !transfer()->ack_out()->is_filled(bin)) {
1234             hint_queue_out_.push_front(tintbin(time, bin.right()));
1235             hint_queue_out_size_ += hint_queue_out_.front().bin.base_length();
1236             bin.to_left();
1237         }
1238 
1239         if (transfer()->ack_out()->is_empty(bin)) {
1240             hint_queue_out_.push_front(tintbin(time, bin));
1241             hint_queue_out_size_ += bin.base_length();
1242         }
1243     }
1244 
1245     // TODO check... the seconds should depend on previous speed of the peer
1246     while (hint_queue_out_.size() && hint_queue_out_.front().time<NOW-TINT_SEC*HINT_TIME*3/2) { // FIXME sec
1247         hint_queue_out_size_ -= hint_queue_out_.front().bin.base_length();
1248         if (DEBUGTRAFFIC)
1249             dprintf("%s #%" PRIu32 " Removing queued hint:%s\n",tintstr(),id_, hint_queue_out_.front().bin.str().c_str());
1250         hint_queue_out_.pop_front();
1251     }
1252 
1253     if (!hint_queue_out_size_) {
1254         if (DEBUGTRAFFIC)
1255             fprintf(stderr, " ..refill\n");
1256         return bin_t::NONE;
1257     }
1258 
1259     while (hint_queue_out_.front().bin.base_length()>size) {
1260         tintbin ret = hint_queue_out_.front();
1261         ret.bin.to_left();
1262         hint_queue_out_.front().bin = ret.bin.sibling();
1263         hint_queue_out_.push_front(ret);
1264     }
1265 
1266     bin_t res = hint_queue_out_.front().bin;
1267     hint_queue_out_.pop_front();
1268     hint_queue_out_size_ -= res.base_length();
1269     if (DEBUGTRAFFIC)
1270         fprintf(stderr, " sending %s [%llu]\n",res.str().c_str(),res.base_length());
1271     return res;
1272 
1273 }
1274 
1275 
OnData(struct evbuffer * evb)1276 bin_t Channel::OnData(struct evbuffer *evb)     // TODO: HAVE NONE for corrupted data
1277 {
1278 
1279     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1280     if (bv.size() == 0 || bv.size() > 1) {
1281         // Chunk spec must denote single chunk
1282         dprintf("%s #%" PRIu32 " ?data bad chunk spec\n",tintstr(),id_);
1283         Close(CLOSE_DO_NOT_SEND);
1284         evbuffer_drain(evb, evbuffer_get_length(evb));
1285         return bin_t::NONE;
1286     }
1287     bin_t pos = bv.front();
1288     tint peer_time = TINT_NEVER;
1289     if (hs_out_->version_ == VER_PPSPP_v1)
1290         peer_time = evbuffer_remove_64be(evb);
1291 
1292     // Arno: Assuming DATA last message in datagram
1293     if (evbuffer_get_length(evb) > transfer()->chunk_size()) {
1294         dprintf("%s #%" PRIu32 " !data chunk size mismatch %s: exp %" PRIu32 " got " PRISIZET "\n",tintstr(),id_,
1295                 pos.str().c_str(), transfer()->chunk_size(), evbuffer_get_length(evb));
1296         fprintf(stderr,"WARNING: chunk size mismatch: exp %" PRIu32 " got " PRISIZET "\n",transfer()->chunk_size(),
1297                 evbuffer_get_length(evb));
1298     }
1299 
1300     int length = (evbuffer_get_length(evb) < transfer()->chunk_size()) ? evbuffer_get_length(
1301                      evb) : transfer()->chunk_size();
1302     if (!transfer()->ack_out()->is_empty(pos)) {
1303         // Arno, 2012-01-24: print message for duplicate
1304         dprintf("%s #%" PRIu32 " Ddata %s\n",tintstr(),id_,pos.str().c_str());
1305         evbuffer_drain(evb, length);
1306         data_in_ = tintbin(TINT_NEVER,transfer()->ack_out()->cover(pos));
1307 
1308         // Arno, 2012-01-24: Make sure data interarrival periods don't get
1309         // screwed up because of these (ignored) duplicates.
1310         UpdateDIP(pos);
1311         return bin_t::NONE;
1312     }
1313 
1314     uint8_t *data = evbuffer_pullup(evb, length);
1315 
1316     //fprintf(stderr,"OnData: Got chunk %d / %" PRIi64 "\n", length, swift::SeqComplete(transfer()->fd()) );
1317 
1318     // SIGNPEAK
1319     if (hashtree() != NULL && (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_MERKLE
1320                                || hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)) {
1321         // Check integrity
1322         if (!hashtree()->OfferData(pos, (char*)data, length)) {
1323             evbuffer_drain(evb, length);
1324             dprintf("%s #%" PRIu32 " !data %s\n",tintstr(),id_,pos.str().c_str());
1325             return bin_t::NONE;
1326         }
1327 
1328         // Arno: If we are getting content, keep activated
1329         swift::Touch(transfer()->td());
1330     } else {
1331         // No content integrity checking, just write (TODO SIGN_ALL)
1332         int ret = transfer()->GetStorage()->Write(data,length,pos.base_offset()*transfer()->chunk_size());
1333         if (ret < 0)
1334             print_error("storage Write failed");
1335         else
1336             transfer()->ack_out()->set(pos);
1337     }
1338 
1339     evbuffer_drain(evb, length);
1340     dprintf("%s #%" PRIu32 " -data %s\n",tintstr(),id_,pos.str().c_str());
1341 
1342     if (DEBUGTRAFFIC)
1343         fprintf(stderr,"$ ");
1344 
1345     // Do swift API callbacks
1346     bin_t cover = transfer()->ack_out()->cover(pos);
1347     transfer()->Progress(cover);
1348     //if (cover.layer() >= 5) // Arno: update DL speed. Tested with 32K, presently = 2 ** 5 * chunk_size CHUNKSIZE
1349     //    transfer()->OnRecvData( pow((double)2,(double)5)*((double)transfer()->chunk_size()) );
1350     transfer()->OnRecvData(transfer()->chunk_size());
1351 
1352     data_in_ = tintbin(NOW,bin_t::NONE);
1353     data_in_.bin = pos;
1354     // Ric: the time of the ack is the owd.
1355     if (peer_time!=TINT_NEVER)
1356         data_in_.time = NOW - peer_time;
1357 
1358     UpdateDIP(pos);
1359     CleanHintOut(pos);
1360     bytes_down_ += length;
1361     global_bytes_down += length;
1362 
1363     // SIGNPEAK
1364     // Purge hash tree, if desired
1365     if (hs_out_->live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) {
1366         // Discard parts of tree no longer in window
1367         LiveTransfer *lt = (LiveTransfer *)transfer();
1368         LiveHashTree *umt = (LiveHashTree *)hashtree();
1369         lt->OnDataPruneTree(*hs_out_,pos,umt->GetNChunksPerSig());
1370     }
1371 
1372     return pos;
1373 }
1374 
1375 
UpdateRTT(tint owd)1376 void Channel::UpdateRTT(tint owd)
1377 {
1378 
1379     // one-way delay calculations
1380     std::pair <tint,tint> delay(owd, NOW);
1381     owd_current_.push_front(delay);
1382 
1383     if (owd_min_bin_start_+ LEDBAT_ROLLOVER < NOW) {
1384         owd_min_bin_start_ = NOW;
1385         owd_min_bin_ = owd_min_bin_ == 9 ? 0 : owd_min_bin_ + 1;
1386         owd_min_bins_[owd_min_bin_] = owd;
1387     } else if (owd_min_bins_[owd_min_bin_]>owd)
1388         owd_min_bins_[owd_min_bin_] = owd;
1389 
1390     ack_rcvd_recent_++;
1391 
1392 }
1393 
UpdateDIP(bin_t pos)1394 void Channel::UpdateDIP(bin_t pos)
1395 {
1396     if (!pos.is_none()) {
1397         if (last_data_in_time_) {
1398             /*
1399             tint dip = NOW - last_data_in_time_;
1400             dip_avg_ = (dip_avg_*7 + dip) >> 3;
1401             */
1402             tint dip = NOW - last_data_in_time_;
1403             std::pair <tint,tint> dip_hist(dip, NOW);
1404             dip_list_.push_front(dip_hist);
1405             dip_avg_ = (dip_avg_*3 + dip) >> 2;
1406         }
1407         last_data_in_time_ = NOW;
1408     }
1409 
1410     // RTTFIX
1411     /* Arno: in a true client/server scenario the initial RTT is used to set
1412      * rtt_avg_ and it is never updated. If that gets a bad sample this can
1413      * kill performance. Simple workaround against too high values.
1414      * Ric: we need to prevent also against burst in the network link.
1415      * It also happens that RTT is too small
1416      */
1417     if (rtt_hint_tintbin_.bin == pos && IsComplete()) {
1418         tint diff = NOW - rtt_hint_tintbin_.time;
1419         // Conservative: only adjust rtt_avg_ if 2x smaller
1420         if (diff < rtt_avg_>>1) {
1421             dprintf("%s #%" PRIu32 " rtt adjust %" PRIi64 " -> %" PRIi64 "\n",tintstr(),id_,rtt_avg_,diff);
1422             rtt_avg_ = diff;
1423         }
1424         // Ric: check if our values are wrong!
1425         tint owd = data_in_.time;
1426         if (owd != TINT_NEVER && owd<<2 > rtt_avg_) {
1427             dprintf("%s #%" PRIu32 " rtt adjust %" PRIi64 " -> %" PRIi64 "\n",tintstr(),id_,rtt_avg_,diff);
1428             rtt_avg_ = owd<<2;
1429         }
1430         rtt_hint_tintbin_.bin = bin_t::NONE;
1431     }
1432 }
1433 
1434 
OnAck(struct evbuffer * evb)1435 void Channel::OnAck(struct evbuffer *evb)
1436 {
1437 
1438     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1439     if (bv.size() == 0) {
1440         // Could not parse chunk spec
1441         dprintf("%s #%" PRIu32 " ?ack bad chunk spec\n",tintstr(),id_);
1442         Close(CLOSE_DO_NOT_SEND);
1443         evbuffer_drain(evb, evbuffer_get_length(evb));
1444         return;
1445     }
1446     tint peer_owd = evbuffer_remove_64be(evb);
1447 
1448     munro_ack_rcvd_ = true;
1449 
1450     binvector::iterator iter;
1451     for (iter=bv.begin(); iter != bv.end(); iter++) {
1452         bin_t ackd_pos = *iter;
1453 
1454         // FIXME FIXME: wrap around here
1455         if (ackd_pos.is_none()) // safety catch
1456             return; // likely, broken chunk/ insufficient hashes
1457         if (transfer()->ttype() == FILE_TRANSFER && hashtree() != NULL && hashtree()->size()
1458                 && ackd_pos.base_offset()>=hashtree()->size_in_chunks()) {
1459             eprintf("invalid ack: %s\n",ackd_pos.str().c_str());
1460             return;
1461         }
1462         ack_in_.set(ackd_pos);
1463 
1464         //fprintf(stderr,"OnAck: got bin %s is_complete %d\n", ackd_pos.str(), (int)ack_in_.is_complete_arno( transfer()->ack_out()->get_height() ));
1465 
1466         int di = 0, ri = 0;
1467         // find an entry for the send (data out) event
1468         while (di<data_out_.size() && (data_out_[di]==tintbin() || !ackd_pos.contains(data_out_[di].bin)))
1469             di++;
1470         // FUTURE: delayed acks
1471         // rule out retransmits
1472         // Ric: by ruling out retransmits we screw up ledbat calculations
1473         while (ri<data_out_tmo_.size() && !ackd_pos.contains(data_out_tmo_[ri].bin))
1474             ri++;
1475 
1476         dprintf("%s #%" PRIu32 " %cack %s owd:%" PRIi64 "\n",tintstr(),id_,
1477                 di==data_out_.size() ? (ri==data_out_tmo_.size() ? '?':'R') : '-',ackd_pos.str().c_str(),peer_owd);
1478 
1479         if (di!=data_out_.size()) {
1480             // Ric: FIXME assuming direct sending of acks
1481             tint rtt = NOW-data_out_[di].time;
1482 
1483             // Ric: quickly adapt to new network changes! (with large owd samples the previous rtt values influence
1484             //if (owd > rtt_avg_)
1485             //   rtt_avg_ = (rtt_avg_*3 + rtt) >> 2;
1486             //else
1487             rtt_avg_ = (rtt_avg_*7 + rtt) >> 3;
1488             dev_avg_ = (dev_avg_*3 + tintabs(rtt-rtt_avg_)) >> 2;
1489             assert(data_out_[di].time!=TINT_NEVER);
1490             dprintf("%s #%" PRIu32 " rtt:%" PRIu64 ", rtt_avg:%" PRIu64 " dev:%" PRIu64 "\n", tintstr(), id_,rtt, rtt_avg_,
1491                     dev_avg_);
1492 
1493             UpdateRTT(peer_owd);
1494             dprintf("%s #%" PRIu32 " setting null %s\n",tintstr(),id_, data_out_[di].bin.str().c_str());
1495             // early loss detection by packet reordering
1496             /* TODO do we really need it?
1497             for (int re=0; re<di-MAX_REORDERING; re++) {
1498                if (data_out_[re]==tintbin())
1499                    continue;
1500                ack_not_rcvd_recent_++;
1501                data_out_size_--;
1502                data_out_tmo_.push_back(data_out_[re].bin);
1503                dprintf("%s #%" PRIu32 " Rdata %s\n",tintstr(),id_,data_out_.front().bin.str().c_str());
1504                data_out_cap_ = bin_t::ALL;
1505                data_out_[re] = tintbin();
1506             }*/
1507             data_out_size_--;
1508             data_out_[di]=tintbin();
1509         } else if (ri!=data_out_tmo_.size()) {
1510             // Ric: TODO test
1511             //UpdateRTT(peer_owd);
1512             data_out_tmo_[ri]=tintbin();
1513         }
1514 
1515     }
1516 
1517     // clear zeroed items
1518     // TODO: do we really need it?
1519     /*while (!data_out_.empty() && ( data_out_.front()==tintbin() ||
1520             ack_in_.is_filled(data_out_.front().bin) ) ) {
1521         dprintf("%s #%" PRIu32 " removing %s\n",tintstr(),id_, data_out_.front().bin.str().c_str());
1522         data_out_.pop_front();
1523     }
1524     while (!data_out_tmo_.empty() && ( data_out_tmo_.front()==tintbin() ||
1525             ack_in_.is_filled(data_out_tmo_.front().bin) ) )
1526         data_out_tmo_.pop_front();
1527     assert(data_out_.empty() || data_out_.front().time!=TINT_NEVER);*/
1528 }
1529 
1530 
TimeoutDataOut()1531 void Channel::TimeoutDataOut()
1532 {
1533     // losses: timeouted packets
1534     // Ric: aggressively timeout only if in active transmission (using cc like LEDBAT)
1535     tint timeout = NOW - ack_timeout();
1536     if (send_control_!=LEDBAT_CONTROL)
1537         timeout -= ack_timeout()<<1;
1538 
1539     while (!data_out_.empty() &&
1540             (data_out_.front().time<timeout || data_out_.front()==tintbin())) {
1541         if (data_out_.front()!=tintbin() && ack_in_.is_empty(data_out_.front().bin)) {
1542             ack_not_rcvd_recent_++;
1543             data_out_cap_ = bin_t::ALL;
1544             // Ric: keep the original timing... otherwise calculations are wrong once
1545             //      we get the ack back
1546             data_out_tmo_.push_back(data_out_.front());
1547             data_out_size_--;
1548             dprintf("%s #%" PRIu32 " Tdata %s\n",tintstr(),id_,data_out_.front().bin.str().c_str());
1549         }
1550         data_out_.pop_front();
1551     }
1552     // clear retransmit queue of older items
1553     while (!data_out_tmo_.empty() && (data_out_tmo_.front()==tintbin()
1554                                       || data_out_tmo_.front().time<NOW-MAX_POSSIBLE_RTT)) {
1555         data_out_tmo_.pop_front();
1556         //data_out_size_--;
1557     }
1558 
1559     // use the same value to clean the delay samples
1560     while (owd_current_.size() > 4 && owd_current_.back().second < timeout) {
1561         owd_current_.pop_back();
1562     }
1563 }
1564 
1565 
OnHave(struct evbuffer * evb)1566 void Channel::OnHave(struct evbuffer *evb)
1567 {
1568 
1569     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1570     if (bv.size() == 0) {
1571         // Could not parse chunk spec
1572         dprintf("%s #%" PRIu32 " ?have bad chunk spec\n",tintstr(),id_);
1573         Close(CLOSE_DO_NOT_SEND);
1574         evbuffer_drain(evb, evbuffer_get_length(evb));
1575         return;
1576     }
1577     binvector::iterator iter;
1578     for (iter=bv.begin(); iter != bv.end(); iter++) {
1579         bin_t ackd_pos = *iter;
1580 
1581         if (ackd_pos.is_none()) // safety catch
1582             return; // wow, peer has hashes
1583 
1584         // PPPLUG
1585         if (!hashtree()->is_complete() && transfer()->ttype() == FILE_TRANSFER) {
1586             FileTransfer *ft = (FileTransfer *)transfer();
1587 
1588             // Ric: update the availability if needed
1589             ft->availability()->set(id_, ack_in_, ackd_pos);
1590         }
1591 
1592         ack_in_.set(ackd_pos);
1593         dprintf("%s #%" PRIu32 " -have %s\n",tintstr(),id_,ackd_pos.str().c_str());
1594 
1595         if (transfer()->ttype() == LIVE_TRANSFER) {
1596             OnHaveLive(ackd_pos);
1597         }
1598     }
1599 }
1600 
1601 
OnHaveLive(bin_t ackd_pos)1602 void Channel::OnHaveLive(bin_t ackd_pos)
1603 {
1604     // Arno, 2012-01-09: Provide PiecePicker with info needed for hook-in.
1605     // Arno, 2013-02-14: Could be optimized such that only right-most bin
1606     // is communicated to LivePiecePicker.
1607     LiveTransfer *lt = (LiveTransfer *)transfer();
1608     if (!lt->am_source()) {
1609         if (hs_in_ != NULL && hs_in_->live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) {
1610             // Filter ack_in_ using live discard window. Peer will only have
1611             // the chunks in that window, so we should not pick outside.
1612 
1613             if (ack_in_right_basebin_ == bin_t::NONE || ackd_pos.base_right() > ack_in_right_basebin_) {
1614                 // 1. Update last non-empty base bin
1615                 ack_in_right_basebin_ = ackd_pos.base_right();
1616 
1617                 // 2. Calc start of window from last non-empty bin
1618                 if (ack_in_right_basebin_.layer_offset() >= hs_in_->live_disc_wnd_) {
1619                     bin_t firstbasepos = bin_t(0,ack_in_right_basebin_.layer_offset() - hs_in_->live_disc_wnd_+1);
1620 
1621                     // 3. Empty all bins before start of window
1622                     binvector cbv;
1623                     swift::chunk32_to_bin32(0, firstbasepos.layer_offset(), &cbv); // firsbasepos exclusive
1624                     binvector::iterator iter;
1625                     for (iter=cbv.begin(); iter != cbv.end(); iter++) {
1626                         bin_t cpos = *iter;
1627                         ack_in_.reset(cpos);
1628                     }
1629                     dprintf("%s #%" PRIu32 " have window %s %s\n",tintstr(),id_,firstbasepos.str().c_str(),
1630                             ack_in_right_basebin_.str().c_str());
1631                 }
1632             }
1633         }
1634 
1635         // Arno: it can happen that we receive a HAVE and have no hints
1636         // outstanding. In that case we should not wait till next_send_time_
1637         // but request directly. See send_control.cpp
1638         if (hint_out_.size() == 0) {
1639             live_have_no_hint_ = true;
1640             dprintf("%s #%" PRIu32 " have but no hints\n",tintstr(),id_);
1641         }
1642     }
1643 }
1644 
1645 
OnHint(struct evbuffer * evb)1646 void Channel::OnHint(struct evbuffer *evb)
1647 {
1648 
1649     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1650     if (bv.size() == 0) {
1651         // Could not parse chunk spec
1652         dprintf("%s #%" PRIu32 " ?hint bad chunk spec\n",tintstr(),id_);
1653         Close(CLOSE_DO_NOT_SEND);
1654         evbuffer_drain(evb, evbuffer_get_length(evb));
1655         return;
1656     }
1657 
1658     binvector::iterator iter;
1659     for (iter=bv.begin(); iter != bv.end(); iter++) {
1660         bin_t hint = *iter;
1661 
1662         // Ric: TODO test
1663         tbqueue::iterator it = hint_in_.begin();
1664         while (it != hint_in_.end()) { // removing likely snubbed hints
1665             bin_t b = it->bin;
1666             if (hint.contains(b) || b.contains(hint))
1667                 dprintf("%s #%" PRIu32 " hint already requested   hint:%s - and:%s!!\n", tintstr(), id_, hint.str().c_str(),
1668                         b.str().c_str());
1669             it++;
1670         }
1671 
1672         // FIXME: wake up here
1673         hint_in_.push_back(hint);
1674         hint_in_size_ += hint.base_length();
1675         dprintf("%s #%" PRIu32 " -hint %s [%" PRIu64 "]\n",tintstr(),id_,hint.str().c_str(),hint_in_size_);
1676 
1677         // SIGNPEAK
1678         //if (hs_in_ != NULL && hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
1679         //    RemoveSinceSignedPeakTuples(hint);
1680     }
1681 }
1682 
1683 /*
1684  * Read full HANDSHAKE message from evb outside of a Channel context.
1685  */
StaticOnHandshake(Address & addr,uint32_t cid,bool ver_known,popt_version_t ver,struct evbuffer * evb)1686 Handshake *Channel::StaticOnHandshake(Address &addr, uint32_t cid, bool ver_known, popt_version_t ver,
1687                                       struct evbuffer *evb)
1688 {
1689     dprintf("StaticOnHandshake: %s id %" PRIu32 " known %d ver %d\n", addr.str().c_str(), cid, (int)ver_known, ver);
1690 
1691     Handshake *hs = new Handshake();
1692 
1693     if (!ver_known) {
1694         uint8_t msgid = evbuffer_remove_8(evb);
1695         if (msgid == SWIFT_INTEGRITY)
1696             ver = VER_SWIFT_LEGACY;
1697         else if (msgid == SWIFT_HANDSHAKE)
1698             ver = VER_PPSPP_v1;
1699         else {
1700             dprintf("%s #%" PRIu32 " ?hs unknown protocol %d %s\n", tintstr(),cid,msgid,addr.str().c_str());
1701             delete hs;
1702             return NULL;
1703         }
1704     }
1705 
1706     if (ver == VER_SWIFT_LEGACY) {
1707         //dprintf("%s #%" PRIu32 " -hs swift legacy\n", tintstr(), cid );
1708         hs->version_ = VER_SWIFT_LEGACY;
1709         if (cid == 0) {
1710             // He is initiating. Initiating handshake has SWIFT_HASH + root hash, reply doesn't
1711             if (evbuffer_get_length(evb)<4+1+4+Sha1Hash::SIZE) {
1712                 dprintf("%s #0 incorrect size %i initial handshake packet %s\n", tintstr(),(int)evbuffer_get_length(evb),
1713                         addr.str().c_str());
1714                 delete hs;
1715                 return NULL;
1716             }
1717             bin_t pos = bin_fromUInt32(evbuffer_remove_32be(evb));
1718             if (!pos.is_all()) {
1719                 dprintf("%s #%" PRIu32 " ?hs that is not the root hash %s\n",tintstr(), cid, addr.str().c_str());
1720                 delete hs;
1721                 return NULL;
1722             }
1723             Sha1Hash roothash = evbuffer_remove_hash(evb);
1724             SwarmID swarmid(roothash);
1725             hs->SetSwarmID(swarmid);
1726             dprintf("%s #%" PRIu32 " -hash ALL %s\n",tintstr(),cid,swarmid.hex().c_str());
1727         }
1728 
1729         // Read SWIFT_HANDSHAKE
1730         uint8_t msgid = evbuffer_remove_8(evb);
1731         hs->peer_channel_id_ = evbuffer_remove_32be(evb);
1732         hs->ResetToLegacy();
1733 
1734         dprintf("%s #%" PRIu32 " -hs swift %x\n",tintstr(),cid,hs->peer_channel_id_);
1735     } else if (ver == VER_PPSPP_v1) {
1736         // IETF PPSP compliant
1737         dprintf("%s #%" PRIu32 " -hs ietf ppsp\n", tintstr(),cid);
1738         hs->peer_channel_id_ = evbuffer_remove_32be(evb);
1739         bool end=false;
1740         uint8_t size8 = 0, i8=0;
1741         uint16_t size = 0;
1742         uint8_t *swarmidbytes = NULL;
1743         uint8_t *msgbitmapbytes = NULL;
1744         SwarmID swarmid;
1745         std::ostringstream cross;
1746         while (!end && evbuffer_get_length(evb) > 0) {
1747             popt_t poid = (popt_t)evbuffer_remove_8(evb);
1748             //dprintf("%s #%" PRIu32 " -hs popt %d\n", tintstr(),cid, (int)poid );
1749             switch (poid) {
1750             case POPT_VERSION:
1751                 hs->version_ = (popt_version_t)evbuffer_remove_8(evb);
1752                 cross << "v" << hs->version_ << " ";
1753                 break;
1754             case POPT_MIN_VERSION:
1755                 hs->min_version_ = (popt_version_t)evbuffer_remove_8(evb);
1756                 cross << "nv" << hs->min_version_ << " ";
1757                 break;
1758             case POPT_SWARMID:
1759                 size = evbuffer_remove_16be(evb);
1760                 if (size > POPT_MAX_SWARMID_SIZE || evbuffer_get_length(evb) < size) {
1761                     dprintf("%s #%" PRIu32 " ?hs popt swarmid too big\n",tintstr(),cid);
1762                     delete hs;
1763                     return NULL;
1764                 }
1765                 swarmidbytes = new uint8_t[size];
1766                 evbuffer_remove(evb,swarmidbytes,size);
1767                 swarmid = SwarmID(swarmidbytes,size);
1768                 delete swarmidbytes;
1769                 hs->SetSwarmID(swarmid);
1770                 cross << "sid " << swarmid.hex() << " ";
1771                 break;
1772             case POPT_CONT_INT_PROT:
1773                 hs->cont_int_prot_ = (popt_cont_int_prot_t)evbuffer_remove_8(evb);
1774                 cross << "cipm " << hs->cont_int_prot_ << " ";
1775                 break;
1776             case POPT_MERKLE_HASH_FUNC:
1777                 hs->merkle_func_ = (popt_merkle_func_t)evbuffer_remove_8(evb);
1778                 cross << "mhf " << hs->merkle_func_ << " ";
1779                 break;
1780             case POPT_LIVE_SIG_ALG:
1781                 hs->live_sig_alg_ = (popt_live_sig_alg_t)evbuffer_remove_8(evb);
1782                 cross << "lsa " << hs->live_sig_alg_ << " ";
1783                 break;
1784             case POPT_CHUNK_ADDR:
1785                 hs->chunk_addr_ = (popt_chunk_addr_t)evbuffer_remove_8(evb);
1786                 cross << "cam " << hs->chunk_addr_ << " ";
1787                 break;
1788             case POPT_LIVE_DISC_WND:
1789                 if (hs->chunk_addr_ == POPT_CHUNK_ADDR_BIN32 || hs->chunk_addr_ == POPT_CHUNK_ADDR_CHUNK32)
1790                     hs->live_disc_wnd_ = evbuffer_remove_32be(evb);
1791                 else
1792                     hs->live_disc_wnd_ = evbuffer_remove_64be(evb);
1793                 cross << "ldw " << std::hex << hs->live_disc_wnd_ << std::dec << " ";
1794                 break;
1795             case POPT_SUPP_MSGS:
1796                 size8 = evbuffer_remove_8(evb);
1797                 if (size8 > 8 || evbuffer_get_length(evb) < size8) {
1798                     dprintf("%s #%" PRIu32 " ?hs popt supp msgs too big\n",tintstr(),cid);
1799                     delete hs;
1800                     return NULL;
1801                 }
1802                 msgbitmapbytes = evbuffer_pullup(evb,size8);
1803                 evbuffer_drain(evb, size);
1804                 cross << "msgs " << std::hex;
1805                 for (i8=0; i8<size8; i8++)
1806                     cross << (int)msgbitmapbytes[i8];
1807                 cross << std::dec;
1808                 break;
1809             case POPT_END:
1810                 end = true;
1811                 dprintf("%s #%" PRIu32 " -hs %x ppsp %s\n", tintstr(), cid, hs->peer_channel_id_, cross.str().c_str());
1812                 break;
1813             default:
1814                 dprintf("%s #%" PRIu32 " ?hs popt id unknown %i\n",tintstr(),cid,(int)poid);
1815                 delete hs;
1816                 return NULL;
1817             }
1818         }
1819     }
1820 
1821     return hs;
1822 }
1823 
OnHandshake(Handshake * hishs)1824 void Channel::OnHandshake(Handshake *hishs)
1825 {
1826 
1827     dprintf("OnHandshake\n");
1828 
1829     if (hishs->peer_channel_id_ == 0) {
1830         // Arno: Got explicit close from peer, close channel and don't send reply
1831         dprintf("%s #%" PRIu32 " -hs close\n",tintstr(),id_);
1832         Close(CLOSE_DO_NOT_SEND);
1833         delete hishs;
1834         return;
1835     } else
1836         dprintf("%s #%" PRIu32 " -hs %x %s opened as channel %" PRIu32 "\n",tintstr(),id_,hishs->peer_channel_id_,
1837                 (hishs->version_ == VER_SWIFT_LEGACY) ? "swift" : "ppsp", id_);
1838 
1839     if (!hishs->IsSupported()) {
1840         dprintf("%s #%" PRIu32 " -hs unsupported\n",tintstr(),id_);
1841         Close(CLOSE_SEND);
1842         delete hishs;
1843         return;
1844     }
1845 
1846     hs_in_ = hishs;
1847     hs_in_->ReleaseSwarmID(); // save mem per channel
1848 
1849     // Self-connection check
1850     if (!SELF_CONN_OK) {
1851         uint32_t try_id = DecodeID(hishs->peer_channel_id_);
1852         // Arno, 2012-05-29: Fixed duplicate test
1853         if (channel(try_id) == this) {
1854             // this is a self-connection
1855             dprintf("%s #%" PRIu32 " -hs closing self\n",tintstr(),id_);
1856             Close(CLOSE_SEND);
1857             hs_in_ = NULL;
1858             delete hishs;
1859             return;
1860         }
1861     }
1862 
1863     if (hs_in_->version_ == VER_SWIFT_LEGACY)
1864         hs_out_->ResetToLegacy(); // he speaks legacy, so will I
1865 
1866     // FUTURE: channel forking
1867     if (is_established()) // when this was reply to our HS
1868         dprintf("%s #%" PRIu32 " established %s\n", tintstr(), id_, peer().str().c_str());
1869 }
1870 
1871 
OnCancel(struct evbuffer * evb)1872 void Channel::OnCancel(struct evbuffer *evb)
1873 {
1874 
1875     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
1876     if (bv.size() == 0) {
1877         // Could not parse chunk spec
1878         dprintf("%s #%" PRIu32 " ?cancel bad chunk spec\n",tintstr(),id_);
1879         Close(CLOSE_DO_NOT_SEND);
1880         evbuffer_drain(evb, evbuffer_get_length(evb));
1881         return;
1882     }
1883 
1884     // Arno, 2012-11-23: chunkaddr translated to list of bins, iterate and
1885     // process in two ways:
1886     // 1. Remove bins from hint_in_ that are contained in the cancelled bins.
1887     //    A hint may have been partially answered so it may be split into
1888     //    smaller bins.
1889     // 2. Split up bins from hint_in_ that have only been partially canceled.
1890     //    i.e., where cancelbin is contained by a bin in hint_in_
1891     //
1892     // If the hint is already in progress (i.e, already transmitted, not yet
1893     // acked), we let it be.
1894     //
1895     binvector::iterator iter;
1896     for (iter=bv.begin(); iter != bv.end(); iter++) {
1897         bin_t cancelbin = *iter;
1898         dprintf("%s #%" PRIu32 " -cancel %s\n",tintstr(),id_,cancelbin.str().c_str());
1899 
1900         // 1. Remove hint from hint_in_ if contained in cancelbin. Use Riccardo's solution:
1901         int hi = 0;
1902         while (hi<hint_in_.size() && !cancelbin.contains(hint_in_[hi].bin) && cancelbin != hint_in_[hi].bin)
1903             hi++;
1904 
1905         // something to cancel?
1906         if (hi != hint_in_.size()) {
1907             // Assumption: all fragments of a bin being cancelled consecutive in hint_in_
1908             do {
1909                 //dprintf("%s #%" PRIu32 " -cancel frag %s\n",tintstr(),id_,hint_in_[hi].bin.str().c_str());
1910                 hint_in_size_ -= hint_in_[hi].bin.base_length();
1911                 hint_in_.erase(hint_in_.begin()+hi);
1912                 if (hint_in_.size() == 0 || hi >= hint_in_.size())
1913                     break;
1914             } while (cancelbin.contains(hint_in_[hi].bin));
1915         }
1916 
1917         //dprintf("%s #%" PRIu32 " -cancel ORIG %s len %d\n",tintstr(),id_,cancelbin.str().c_str(), hint_in_.size() );
1918 
1919         // 2. Fragment hint from hint_in_ if it covers cancelbin. Use Riccardo's solution:
1920         hi = 0;
1921         while (hi<hint_in_.size() && !hint_in_[hi].bin.contains(cancelbin))
1922             hi++;
1923 
1924         // nothing to cancel
1925         if (hi==hint_in_.size())
1926             continue;
1927 
1928         // Split up hint
1929         tint origt = hint_in_[hi].time;
1930         bin_t origbin = hint_in_[hi].bin;
1931         binvector fragbins = swift::bin_fragment(origbin,cancelbin);
1932         // Erase original
1933         hint_in_size_ -= hint_in_[hi].bin.base_length();
1934         hint_in_.erase(hint_in_.begin()+hi);
1935         // Replace with fragments left
1936         binvector::iterator iter2;
1937         int idx=0;
1938         for (iter2=fragbins.begin(); iter2 != fragbins.end(); iter2++) {
1939             bin_t fragbin = *iter2;
1940             //dprintf("%s #%" PRIu32 " -cancel keep %s\n",tintstr(),id_,fragbin.str().c_str());
1941             tintbin newtb(origt,fragbin);
1942             hint_in_size_ += fragbin.base_length();
1943             hint_in_.insert(hint_in_.begin()+hi+idx,newtb);
1944             idx++;
1945         }
1946     }
1947 
1948     for (int i=0; i<hint_in_.size(); i++) {
1949         dprintf("%s #%" PRIu32 " -cancel NETS %s\n",tintstr(),id_,hint_in_[i].bin.str().c_str());
1950     }
1951 }
1952 
1953 
OnPexAdd(struct evbuffer * evb,int family)1954 void Channel::OnPexAdd(struct evbuffer *evb, int family)
1955 {
1956     Address addr = evbuffer_remove_pexaddr(evb,family);
1957     dprintf("%s #%" PRIu32 " -pex %s %s\n",tintstr(),id_,(addr.get_family() == AF_INET) ? "v4" : "v6", addr.str().c_str());
1958 
1959     if (transfer()->OnPexIn(addr))
1960         useless_pex_count_ = 0;
1961     else {
1962         dprintf("%s #%" PRIu32 " already channel to %s\n", tintstr(),id_,addr.str().c_str());
1963         useless_pex_count_++;
1964     }
1965     pex_request_outstanding_ = false;
1966 }
1967 
1968 
OnPexAddCert(struct evbuffer * evb)1969 void Channel::OnPexAddCert(struct evbuffer *evb)
1970 {
1971     OnPexAddCertZeroState(evb);
1972     dprintf("%s #%" PRIu32 " -pex cert\n",tintstr(),id_);
1973 }
1974 
1975 
OnChoke(struct evbuffer * evb)1976 void Channel::OnChoke(struct evbuffer *evb)
1977 {
1978     if (hs_in_->version_ == VER_SWIFT_LEGACY) { // FRAGRAND support
1979         evbuffer_remove_32be(evb); // read 4 random bytes
1980         return;
1981     }
1982 
1983     //PPSPTODO
1984     dprintf("%s #%" PRIu32 " -choke\n",tintstr(),id_);
1985 }
1986 
OnUnchoke(struct evbuffer * evb)1987 void Channel::OnUnchoke(struct evbuffer *evb)
1988 {
1989     //PPSPTODO
1990     dprintf("%s #%" PRIu32 " -unchoke\n",tintstr(),id_);
1991 }
1992 
1993 
OnSignedHash(struct evbuffer * evb)1994 void Channel::OnSignedHash(struct evbuffer *evb)
1995 {
1996     if (transfer()->ttype() != LIVE_TRANSFER) {
1997         dprintf("%s #%" PRIu32 " ?sigh not live swarm\n",tintstr(),id_);
1998         Close(CLOSE_DO_NOT_SEND);
1999         evbuffer_drain(evb, evbuffer_get_length(evb));
2000         return;
2001     }
2002 
2003     binvector bv = evbuffer_remove_chunkaddr(evb,hs_in_->chunk_addr_);
2004     if (bv.size() == 0 || bv.size() > 1) {
2005         // chunk spec for hash must be power-of-2 range, so must fit in single bin
2006         dprintf("%s #%" PRIu32 " ?sigh bad chunk spec\n",tintstr(),id_);
2007         Close(CLOSE_DO_NOT_SEND);
2008         evbuffer_drain(evb, evbuffer_get_length(evb));
2009         return;
2010     }
2011     bin_t pos = bv.front();
2012 
2013     tint source_tint = evbuffer_remove_64be(evb);
2014 
2015     uint16_t siglen = SWIFT_CIPM_NONE_SIGLEN;
2016     LiveHashTree *umt = (LiveHashTree *)hashtree();
2017     if (umt != NULL)
2018         siglen = umt->GetSigSizeInBytes();
2019 
2020     uint8_t *sigdata = new uint8_t[siglen];
2021     if (sigdata == NULL) {
2022         dprintf("%s #%" PRIu32 " ?sigh no mem siglen %" PRIu32 "\n",tintstr(),id_,siglen);
2023         Close(CLOSE_DO_NOT_SEND);
2024         evbuffer_drain(evb, evbuffer_get_length(evb));
2025         return;
2026     }
2027     evbuffer_remove(evb,sigdata,siglen);
2028     Signature sig(sigdata,siglen);
2029     delete sigdata;
2030 
2031     // Don't accept signed hashes from others as source
2032     LiveTransfer *lt = (LiveTransfer *)transfer();
2033     if (lt->am_source())
2034         return;
2035 
2036     if (umt != NULL) {
2037         SigTintTuple sigtint(sig,source_tint);
2038 
2039         bool newverified = umt->OfferSignedMunroHash(pos,sigtint);
2040         if (!newverified)
2041             dprintf("%s #%" PRIu32 " !sigh %s\n",tintstr(),id_,pos.str().c_str());
2042         else {
2043             if (source_tint+(SWIFT_LIVE_MAX_SOURCE_DIVERGENCE_TIME*TINT_SEC) < NOW)
2044                 dprintf("%s #%" PRIu32 " *sigh %s\n",tintstr(),id_,pos.str().c_str()); // outdated Sig
2045             else {
2046                 dprintf("%s #%" PRIu32 " -sigh %s\n",tintstr(),id_,pos.str().c_str());
2047 
2048                 LiveTransfer *lt = (LiveTransfer *)transfer();
2049                 lt->OnVerifiedMunroHash(pos,source_tint);
2050             }
2051         }
2052     } else if (hs_in_->cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
2053         dprintf("%s #%" PRIu32 " -sigh %s\n",tintstr(),id_,pos.str().c_str());
2054 
2055         LiveTransfer *lt = (LiveTransfer *)transfer();
2056         lt->OnVerifiedMunroHash(pos,source_tint);
2057     } else {
2058         dprintf("%s #%" PRIu32 " ?sigh %s\n",tintstr(),id_,pos.str().c_str());
2059     }
2060 }
2061 
2062 
2063 /*
2064  * Sending messages
2065  */
2066 
AddPex(struct evbuffer * evb)2067 void Channel::AddPex(struct evbuffer *evb)
2068 {
2069     // Gertjan fix: Reverse PEX
2070     // PEX messages sent to facilitate NAT/FW puncturing get priority
2071     if (!reverse_pex_out_.empty()) {
2072         do {
2073             tintbin pex_peer = reverse_pex_out_.front();
2074             reverse_pex_out_.pop_front();
2075             if (channels[(int) pex_peer.bin.toUInt()] == NULL)
2076                 continue;
2077             Address a = channels[(int) pex_peer.bin.toUInt()]->peer();
2078             // Arno, 2012-02-28: Don't send private addresses to non-private peers.
2079             if (!a.is_private() || (a.is_private() && peer().is_private())) {
2080                 evbuffer_add_pexaddr(evb, a);
2081                 dprintf("%s #%" PRIu32 " +pex (reverse) %s\n",tintstr(),id_,a.str().c_str());
2082             }
2083         } while (!reverse_pex_out_.empty() && (SWIFT_MAX_NONDATA_DGRAM_SIZE-evbuffer_get_length(evb)) >= 7);
2084 
2085         // Arno: 2012-02-23: Don't think this is right. Bit of DoS thing,
2086         // that you only get back the addr of people that got your addr.
2087         // Disable for now.
2088         //return;
2089     }
2090 
2091     if (!pex_requested_)
2092         return;
2093 
2094     // Arno, 2012-02-28: Don't send private addresses to non-private peers.
2095     int tries=0;
2096     Channel *c=NULL;
2097     Address a;
2098     while (true) {
2099         // Arno, 2011-10-03: Choosing Gertjan's RandomChannel over RevealChannel here.
2100         c = transfer()->RandomChannel(this);
2101         if (c == NULL || tries > 5) {
2102             pex_requested_ = false;
2103             return;
2104         }
2105         a = c->peer();
2106         if (!a.is_private() || (a.is_private() && peer().is_private()))
2107             break;
2108         tries++;
2109     }
2110 
2111     evbuffer_add_pexaddr(evb, a);
2112     dprintf("%s #%" PRIu32 " +pex %s\n",tintstr(),id_,a.str().c_str());
2113 
2114     pex_requested_ = false;
2115     /* Ensure that we don't add the same id to the reverse_pex_out_ queue
2116        more than once. */
2117     int chid = c->id();
2118     for (tbqueue::iterator i = channels[chid]->reverse_pex_out_.begin();
2119             i != channels[chid]->reverse_pex_out_.end(); i++)
2120         if ((int)(i->bin.toUInt()) == id_)
2121             return;
2122 
2123     dprintf("%s #%" PRIu32 " adding pex for channel %" PRIu32 " at time %s\n", tintstr(), chid,
2124             id_, tintstr(NOW + 2 * TINT_SEC));
2125     // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
2126     channels[chid]->reverse_pex_out_.push_back(tintbin(NOW + 2 * TINT_SEC, bin_t(id_)));
2127     if (channels[chid]->send_control_ == KEEP_ALIVE_CONTROL &&
2128             channels[chid]->next_send_time_ > NOW + 2 * TINT_SEC)
2129         channels[chid]->Reschedule();
2130 }
2131 
2132 
2133 
OnPexReq(void)2134 void Channel::OnPexReq(void)
2135 {
2136     dprintf("%s #%" PRIu32 " -pex req\n", tintstr(), id_);
2137     if (NOW > MIN_PEX_REQUEST_INTERVAL + last_pex_request_time_)
2138         pex_requested_ = true;
2139 }
2140 
AddPexReq(struct evbuffer * evb)2141 void Channel::AddPexReq(struct evbuffer *evb)
2142 {
2143     // Rate limit the number of PEX requests
2144     if (NOW < next_pex_request_time_)
2145         return;
2146 
2147     // If no answer has been received from a previous request, count it as useless
2148     if (pex_request_outstanding_)
2149         useless_pex_count_++;
2150 
2151     pex_request_outstanding_ = false;
2152 
2153     // Initiate at most SWIFT_MAX_CONNECTIONS connections
2154     if (transfer()->GetChannels()->size() >= SWIFT_MAX_OUTGOING_CONNECTIONS ||
2155             // Check whether this channel has been providing useful peer information
2156             useless_pex_count_ > 2) {
2157         // Arno, 2012-02-23: Fix: Code doesn't recover from useless_pex_count_ > 2,
2158         // let's just try again in 30s
2159         useless_pex_count_ = 0;
2160         next_pex_request_time_ = NOW + 30 * TINT_SEC;
2161 
2162         return;
2163     }
2164 
2165     dprintf("%s #%" PRIu32 " +pex req\n", tintstr(), id_);
2166     evbuffer_add_8(evb, SWIFT_PEX_REQ);
2167     /* Add a little more than the minimum interval, such that the other party is
2168        less likely to drop it due to too high rate */
2169     next_pex_request_time_ = NOW + MIN_PEX_REQUEST_INTERVAL * 1.1;
2170     pex_request_outstanding_ = true;
2171 }
2172 
2173 
2174 
2175 /*
2176  * Channel class methods
2177  */
2178 
LibeventReceiveCallback(evutil_socket_t fd,short event,void * arg)2179 void Channel::LibeventReceiveCallback(evutil_socket_t fd, short event, void *arg)
2180 {
2181     // Called by libevent when a datagram is received on the socket
2182     Time();
2183     dprintf("%s recv callback\n",tintstr());
2184 
2185     RecvDatagram(fd);
2186     event_add(&evrecv, NULL);
2187 }
2188 
RecvDatagram(evutil_socket_t socket)2189 void Channel::RecvDatagram(evutil_socket_t socket)
2190 {
2191     struct evbuffer *evb = evbuffer_new();
2192     Address addr;
2193     Handshake *hishs = NULL;
2194 
2195     RecvFrom(socket, addr, evb);
2196     size_t evboriglen = evbuffer_get_length(evb);
2197 
2198     dprintf("%s recvdgram " PRISIZET "\n",tintstr(),evboriglen);
2199 
2200 //#define return_log(...) { fprintf(stderr,__VA_ARGS__); evbuffer_free(evb); return; }
2201 #define return_log(...) { dprintf(__VA_ARGS__); evbuffer_free(evb); if (hishs != NULL) { delete hishs; } return; }
2202     if (evbuffer_get_length(evb)<4)
2203         return_log("socket layer weird: datagram < 4 bytes from %s (prob ICMP unreach)\n",addr.str().c_str());
2204 
2205     uint32_t mych = evbuffer_remove_32be(evb);
2206 
2207     Channel* channel = NULL;
2208     if (mych==0) { // peer initiates handshake
2209 
2210         hishs = StaticOnHandshake(addr,0,false,VER_PPSPP_v1,evb);
2211         if (hishs == NULL) // dprintf already called
2212             return_log("%s #0 ?hs bad\n",tintstr());
2213 
2214         SwarmID swarmid = hishs->GetSwarmID();
2215         int td = swift::Find(swarmid,true); // Activate
2216         if (td < 0) {
2217             // No known swarm, check if available as zero state
2218             ZeroState *zs = ZeroState::GetInstance();
2219             td = zs->Find(swarmid.roothash());
2220             if (td == -1) {
2221                 // Don't reply to strangers knocking
2222                 //StaticSendClose(socket,addr,hishs->peer_channel_id_);
2223                 return_log("%s #0 swarm %s unknown, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),addr.str().c_str());
2224             }
2225         }
2226         ContentTransfer *ct = swift::GetActivatedTransfer(td);
2227         if (ct == NULL) {
2228             StaticSendClose(socket,addr,hishs->peer_channel_id_);
2229             return_log("%s #0 swarm %s known, couldn't be activated; requested by %s\n", tintstr(),
2230                        hishs->GetSwarmID().hex().c_str(), addr.str().c_str());
2231         } else if (!ct->IsOperational()) {
2232             // Activated, but broken
2233             StaticSendClose(socket,addr,hishs->peer_channel_id_);
2234             return_log("%s #0 swarm %s broken, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),addr.str().c_str());
2235         }
2236 
2237         // Arno, 2012-02-27: Check for duplicate channel
2238         Channel* existchannel = ct->FindChannel(addr,NULL);
2239         if (existchannel != NULL) {
2240             // Arno: 2011-10-13: Ignore if established, otherwise consider
2241             // it a concurrent connection attempt.
2242             if (existchannel->is_established()) {
2243                 // ARNOTODO: Read complete handshake here so we know whether
2244                 // attempt is to new channel or to existing. Currently read
2245                 // in OnHandshake()
2246                 //
2247                 // Arno, 2012-12-17: in Android app peers have hardwired port
2248                 // so this happens often. Assuming that sender has reasons
2249                 // to rehandshake, now just close old.
2250                 dprintf("%s #0 have a channel already to %s, closing old\n",tintstr(),addr.str().c_str());
2251 
2252                 // Arno, 2012-12-17: On Android closing the channel causes swift
2253                 // to crash. On Win32 I don't see this behaviour. For now, let
2254                 // the channel die out by itself. The sender will not accept
2255                 // the datagrams sent by this peer on the old channel because
2256                 // it doesn't know the old channel ID.
2257                 // existchannel->Close(CLOSE_DO_NOT_SEND);
2258                 channel = NULL;
2259             } else {
2260                 channel = existchannel;
2261                 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: reuse channel %s\n", channel->peer_.str() );
2262             }
2263         }
2264         if (channel == NULL) {
2265             if (ct->GetChannels()->size() < SWIFT_MAX_INCOMING_CONNECTIONS) {
2266                 //fprintf(stderr,"Channel::RecvDatagram: HANDSHAKE: create new channel %s\n", addr.str().c_str() );
2267                 channel = new Channel(ct, socket, addr);
2268             } else {
2269                 // Too many connections
2270                 StaticSendClose(socket,addr,hishs->peer_channel_id_);
2271                 return_log("%s #0 swarm %s too many connections, requested by %s\n",tintstr(),hishs->GetSwarmID().hex().c_str(),
2272                            addr.str().c_str());
2273             }
2274         }
2275         //fprintf(stderr,"CHANNEL INCOMING DEF hass %s is id %d\n",hishs->GetSwarmID().hex().c_str(),channel->id());
2276 
2277         channel->OnHandshake(hishs);
2278 
2279     } else if (CmdGwTunnelCheckChannel(mych)) {
2280         // SOCKTUNNEL
2281         CmdGwTunnelUDPDataCameIn(addr,mych,evb);
2282         evbuffer_free(evb);
2283         return;
2284     } else { // peer responds to my handshake (and other messages)
2285         mych = DecodeID(mych);
2286         if (mych>=channels.size())
2287             return_log("%s invalid channel #%" PRIu32 ", %s\n",tintstr(),mych,addr.str().c_str());
2288         channel = channels[mych];
2289         if (!channel)
2290             return_log("%s #%" PRIu32 " is already closed\n",tintstr(),mych);
2291         if (channel->IsDiffSenderOrDuplicate(addr,mych)) {
2292             dprintf("%s #%" PRIu32 " ?channel diff or dup\n",tintstr(),mych);
2293             channel->Close(CLOSE_SEND_IF_ESTABLISHED);
2294             delete channel; // safe, not in a channel event
2295             return_log("%s #%" PRIu32 " is duplicate\n",tintstr(),mych);
2296         }
2297         channel->own_id_mentioned_ = true;
2298     }
2299     channel->raw_bytes_down_ += evboriglen;
2300     //dprintf("recvd %i bytes for %i\n",data.size(),channel->id);
2301     bool wasestablished = channel->is_established();
2302 
2303     //dprintf("%s #%" PRIu32 " peer %s recv_peer %s addr %s\n", tintstr(),mych, channel->peer().str().c_str(), channel->recv_peer().str(), addr.str() );
2304 
2305     // Process messages
2306     if (channel->send_control_!=CLOSE_CONTROL)
2307         channel->Recv(evb);
2308 
2309     evbuffer_free(evb);
2310 
2311     //SAFECLOSE
2312     if (channel->send_control_==CLOSE_CONTROL) {
2313         // Arno, 2012-07-27: Received an explict close, clean up channel
2314         delete channel;
2315     }
2316 }
2317 
2318 
2319 
2320 /*
2321  * Channel instance methods
2322  */
2323 
CloseChannelByAddress(const Address & addr)2324 void Channel::CloseChannelByAddress(const Address &addr)
2325 {
2326     // fprintf(stderr,"CloseChannelByAddress: address is %s\n", addr.str().c_str() );
2327 
2328     dprintf("%s #-1 close channel by address %s\n",tintstr(), addr.str().c_str());
2329     channels_t::iterator iter;
2330     for (iter = channels.begin(); iter != channels.end(); iter++) {
2331         Channel *c = *iter;
2332         if (c != NULL && c->peer_ == addr) {
2333             dprintf("%s #%" PRIu32 " close by addr\n",tintstr(),c->id());
2334             c->Close(CLOSE_DO_NOT_SEND);
2335             delete c; // safe, not in a send event, and doesn't modify channels
2336             break;
2337         }
2338     }
2339 }
2340 
Close(close_send_t closesend)2341 void Channel::Close(close_send_t closesend)
2342 {
2343 
2344     dprintf("%s #%" PRIu32 " closing channel\n",tintstr(),id_);
2345 
2346     lprintf("\t\t==== Switch to Close Control ==== \n");
2347     this->SwitchSendControl(CLOSE_CONTROL);
2348 
2349     if (closesend == CLOSE_SEND || (closesend == CLOSE_SEND_IF_ESTABLISHED && is_established()))
2350         this->Send(); // Arno: send explicit close
2351 
2352     if (hs_in_ != NULL) // is_established() -> false
2353         hs_in_->peer_channel_id_ = 0;
2354 
2355     //LIVE
2356     if (transfer()->ttype() == FILE_TRANSFER) {
2357         FileTransfer *ft = (FileTransfer *)transfer();
2358         if (!ft->IsZeroState()
2359                 && ft->availability() != NULL) { // availability() is NULL when this is called from ContentTransfer/CloseChannels()
2360             // Ric: remove its binmap from the availability
2361             ft->availability()->removeBinmap(id_, ack_in_);
2362         }
2363     }
2364 
2365     // SAFECLOSE
2366     // Arno: ensure LibeventSendCallback is no longer called with ptr to this Channel
2367     ClearEvents();
2368 }
2369 
2370 
2371 
Reschedule()2372 void Channel::Reschedule()
2373 {
2374 
2375     // Arno: CAREFUL: direct send depends on diff between next_send_time_ and
2376     // NOW to be 0, so any calls to Time in between may put things off. Sigh.
2377     Time();
2378 
2379     if (evsend_ptr_ == NULL) {
2380         dprintf("%s #%" PRIu32 " cannot requeue for %s, closed\n",tintstr(),id_,tintstr(next_send_time_));
2381         return;
2382     }
2383     struct timeval currtv;
2384     // remove pending events if in keep-alive mode
2385     if (send_control_ == KEEP_ALIVE_CONTROL && evtimer_pending(evsend_ptr_, &currtv)) {
2386         evtimer_del(evsend_ptr_);
2387     }
2388 
2389     dprintf("%s schedule\n",tintstr());
2390 
2391     // Ric: before rescheduling check if we already have scheduled it in the past.
2392     // calculate the delay only if the reschedule has been called after a send, ignore reschedules
2393     // triggered by something received.
2394     if (last_send_time_>next_send_time_ && next_send_time_<NOW && send_control_ == LEDBAT_CONTROL) {
2395         dprintf("%s #%" PRIu32 " Already something scheduled for: %s\n",tintstr(),id_, tintstr(next_send_time_));
2396         reschedule_delay_ = NOW - next_send_time_;
2397         dprintf("%s #%" PRIu32 " reschedule delay :%" PRIi64 "\n",tintstr(),id_,reschedule_delay_);
2398     }
2399 
2400     next_send_time_ = NextSendTime();
2401     dprintf("%s #%" PRIu32 " next send in :%" PRIi64 "\n",tintstr(),id_,next_send_time_-NOW);
2402     if (next_send_time_!=TINT_NEVER) {
2403 
2404         assert(next_send_time_<NOW+TINT_MIN);
2405         tint duein = next_send_time_-NOW;
2406 
2407         if (duein <= 0) {
2408             // Arno, 2011-10-18: libevent's timer implementation appears to be
2409             // really slow, i.e., timers set for 100 usec from now get called
2410             // at least two times later :-( Hence, for sends after receives
2411             // perform them directly.
2412             // Ric: The timer error of libevent can be propagated throughout the
2413             // download.. keep track of the delay error!
2414             dprintf("%s #%" PRIu32 " requeue direct send (%s)\n",tintstr(),id_, duein<=0 ? "duein" : "direct sending");
2415             //next_send_time_ = NOW;
2416 
2417             LibeventSendCallback(-1,EV_TIMEOUT,this);
2418         } else {
2419             struct timeval duetv = *tint2tv(duein);
2420             evtimer_add(evsend_ptr_,&duetv);
2421             dprintf("%s #%" PRIu32 " requeue for %s in %" PRIi64 "\n",tintstr(),id_,tintstr(next_send_time_), duein);
2422         }
2423     } else {
2424         // SAFECLOSE
2425         dprintf("%s #%" PRIu32 " resched, will close\n",tintstr(),id_);
2426         // Arno: Cannot clean up send events or channel here as we may be
2427         // LibeventSendCallback() on a specific channel. Do on another event,
2428         // see ContentTransfer::LibeventCleanCallback()
2429         this->Schedule4Delete();
2430     }
2431 }
2432 
2433 
2434 /*
2435  * Channel class methods
2436  */
LibeventSendCallback(int fd,short event,void * arg)2437 void Channel::LibeventSendCallback(int fd, short event, void *arg)
2438 {
2439 
2440     // Called by libevent when it is the requested send time.
2441     Time();
2442     dprintf("%s send callback\n",tintstr());
2443 
2444     Channel * sender = (Channel*) arg;
2445     if (NOW<sender->next_send_time_-TINT_MSEC)
2446         dprintf("%s #%" PRIu32 " suspicious send %s<%s\n",tintstr(),
2447                 sender->id(),tintstr(NOW),tintstr(sender->next_send_time_));
2448     if (sender->next_send_time_ != TINT_NEVER)
2449         sender->Send();
2450 }
2451 
2452 
StaticSendClose(evutil_socket_t socket,Address & addr,uint32_t peer_channel_id)2453 void Channel::StaticSendClose(evutil_socket_t socket,Address &addr, uint32_t peer_channel_id)
2454 {
2455     if (peer_channel_id == 0)
2456         return; // safety catch
2457 
2458     struct evbuffer *evb = evbuffer_new();
2459     evbuffer_add_32be(evb, peer_channel_id);  // His channel ID
2460     evbuffer_add_8(evb, SWIFT_HANDSHAKE);
2461     evbuffer_add_32be(evb, 0);  // Initial channel ID
2462     evbuffer_add_8(evb, POPT_END); // Empty protocol options list
2463 
2464     int r = SendTo(socket,addr,evb);
2465     if (r==-1)
2466         print_error("swift can't send datagram");
2467 
2468     evbuffer_free(evb);
2469 }
2470