1 /*
2  *  swift.h
3  *  the main header file for libswift, normally you should only read this one.
4  *
5  *  This implementation supports 2 versions of swift:
6  *  - the original (legacy version)
7  *  - the IETF Peer-to-Peer Streaming Peer Protocol -03 compliant one.
8  *
9  *  Arno: libswift clients should call the swift top-level API which consists of
10  *  swift::Listen, Open, Read, etc. The *Transfer interfaces are internal
11  *  to the library, except in a few exceptional cases.
12  *
13  *  The swift API hides the swarm management mechanism that activates/
14  *  deactivates file-based swarms. A swarm is activated when a *Transfer object
15  *  exists. Live swarms are always active (LiveTransfer). Orthogonal to swarm
16  *  activation is the use of file-based swarms in zero-state mode, which means
17  *  that when activated only the minimal information needed to use the swarm is
18  *  loaded into memory. In particular, content and hashes are read directly
19  *  from disk.
20  *
21  *  Created by Victor Grishchenko, Arno Bakker, Riccardo Petrocco
22  *  Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
23  *
24  */
25 /*
26 
27   The swift protocol (legacy)
28 
29   Messages
30 
31   HANDSHAKE    00, channelid
32   Communicates the channel id of the sender. The
33   initial handshake packet also has the root hash
34   (a HASH message).
35 
36   DATA        01, bin_32, buffer
37   1K of data.
38 
39   ACK         02, bin_32, timestamp_64
40   HAVE        03, bin_32
41   Confirms successful delivery of data. Used for congestion control, as well.
42 
43   REQUEST     08, bin_32
44   Practical value of requests aka "hints" is to avoid overlap, mostly.
45   Hints might be lost in the network or ignored.
46   Peer might send out data without a hint.
47   Hint which was not responded (by DATA) in some RTTs
48   is considered to be ignored.
49   As peers cant pick randomly kilobyte here and there,
50   they send out "long hints" for non-base bins.
51 
52   INTEGRITY   04, bin_32, sha1hash
53   SHA1 hash tree hashes for data verification. The
54   connection to a fresh peer starts with bootstrapping
55   him with peak hashes. Later, before sending out
56   any data, a peer sends the necessary uncle hashes.
57 
58   PEX+/PEX-   05/06, ipv4 addr, port
59   Peer exchange messages; reports all connected and
60   disconnected peers. Might has special meaning (as
61   in the case with swarm supervisors).
62 
63 */
64 #ifndef SWIFT_H
65 #define SWIFT_H
66 
67 // Arno, 2013-06-11: Must come first to ensure SIZE_MAX etc are defined
68 #include "compat.h"
69 #include <deque>
70 #include <vector>
71 #include <set>
72 #include <map>
73 #include <list>
74 #include <algorithm>
75 #include <string>
76 
77 #include <event2/event.h>
78 #include <event2/event_struct.h>
79 #include <event2/buffer.h>
80 #include "bin.h"
81 #include "binmap.h"
82 #include "hashtree.h"
83 #include "livehashtree.h"
84 #include "avgspeed.h"
85 #include "avail.h"
86 #include "exttrack.h"
87 
88 
89 namespace swift
90 {
91 
92 // Arno, 2012-12-12: Configure which PPSP version to use by default. Set to 0 for legacy swift.
93 #define ENABLE_IETF_PPSP_VERSION      1
94 
95 // Whether to try legacy protocol when PPSP handshakes don't result in response
96 #define ENABLE_FALLBACK_TO_LEGACY_PROTO 1
97 
98 // Arno, 2011-12-22: Enable Riccardo's VodPiecePicker
99 #define ENABLE_VOD_PIECEPICKER        0
100 
101 // Arno, 2013-10-02: Configure which live piecepicker: default or with small-swarms optimization
102 #define ENABLE_LIVE_SMALLSWARMOPT_PIECEPICKER      1
103 
104 
105 // Arno, 2013-10-02: Default for mobile devices. Set to 0 to disable.
106 #define DEFAULT_MOBILE_LIVE_DISC_WND_BYTES         (1*1024*1024*1024) // 1 GB
107 
108 // Value for protocol option: Live Discard Window
109 #define POPT_LIVE_DISC_WND_ALL               0xFFFFFFFF // automatically truncated for 32-bit
110 
111 // scheme for swift URLs
112 #define SWIFT_URI_SCHEME              "tswift"
113 
114 // Max incoming connections. Must be set to large value with swift tracker
115 #define SWIFT_MAX_INCOMING_CONNECTIONS      0xffff
116 
117 // Max outgoing connections
118 #define SWIFT_MAX_OUTGOING_CONNECTIONS      20
119 
120 // Max size of the swarm ID protocol option in a HANDSHAKE message.
121 #define POPT_MAX_SWARMID_SIZE            1024
122 // Max size of a X.509 certificate in a PEX_REScert message.
123 #define PEX_RES_MAX_CERT_SIZE            1024
124 
125 // Live streaming via Unified Merkle Tree or Sign All: The number of chunks per signature
126 // Set to 1 for Sign All, set to a power of 2 > 1 for UMT. This MUST be a power of 2.
127 #define SWIFT_DEFAULT_LIVE_NCHUNKS_PER_SIGN   32
128 
129 // Ric: allowed hints in the future (e.g., 2 x TINT_SEC)
130 #define HINT_TIME                       1   // seconds
131 
132 // timeout for the piece picker
133 #define PICKER_TIMEOUT                     2  // seconds
134 
135 // How much time a SIGNED_INTEGRITY timestamp may diverge from current time
136 #define SWIFT_LIVE_MAX_SOURCE_DIVERGENCE_TIME   30 // seconds
137 
138 
139 #define SWIFT_MAX_UDP_OVER_ETH_PAYLOAD        (1500-20-8)
140 // Arno: Maximum size of non-DATA messages in a UDP packet we send.
141 #define SWIFT_MAX_NONDATA_DGRAM_SIZE         (SWIFT_MAX_UDP_OVER_ETH_PAYLOAD-SWIFT_DEFAULT_CHUNK_SIZE-1-4)
142 // Arno: Maximum size of a UDP packet we send. Note: depends on CHUNKSIZE 8192
143 #define SWIFT_MAX_SEND_DGRAM_SIZE            (SWIFT_MAX_NONDATA_DGRAM_SIZE+1+4+8192)
144 // Arno: Maximum size of a UDP packet we are willing to accept. Note: depends on CHUNKSIZE 8192
145 #define SWIFT_MAX_RECV_DGRAM_SIZE            (SWIFT_MAX_SEND_DGRAM_SIZE*2)
146 
147 #define layer2bytes(ln,cs)    (uint64_t)( ((double)cs)*pow(2.0,(double)ln))
148 #define bytes2layer(bn,cs)  (int)log2(  ((double)bn)/((double)cs) )
149 
150 
151 
152 
153     typedef enum {
154         FILE_TRANSFER,
155         LIVE_TRANSFER
156     } transfer_t;
157 
158 
159     struct SwarmID  {
160     public:
SwarmIDSwarmID161         SwarmID() : empty_(true) {}
SwarmIDSwarmID162         SwarmID(const Sha1Hash &roothash) {
163             ttype_ = FILE_TRANSFER;
164             roothash_ = roothash;
165             empty_=false;
166         }
SwarmIDSwarmID167         SwarmID(const SwarmPubKey &spubkey) {
168             ttype_ = LIVE_TRANSFER;
169             spubkey_ = spubkey;
170             empty_=false;
171         }
172         SwarmID(std::string hexstr);
173         SwarmID(uint8_t *data,uint16_t datalength);
174         ~SwarmID();
175         bool    operator == (const SwarmID& b) const;
176         SwarmID & operator = (const SwarmID &source);
177         /** Returns the type of transfer, FILE_TRANSFER or LIVE_TRANSFER */
ttypeSwarmID178         transfer_t      ttype() {
179             return ttype_;
180         }
roothashSwarmID181         const Sha1Hash  &roothash() const {
182             return roothash_;
183         }
spubkeySwarmID184         const SwarmPubKey &spubkey() const {
185             return spubkey_;
186         }
187         std::string     hex() const;
188         std::string     tofilename() const;
SetRootHashSwarmID189         void        SetRootHash(const Sha1Hash &roothash) {
190             ttype_ = FILE_TRANSFER;
191             roothash_ = roothash;
192             empty_=false;
193         }
194 
195         const static SwarmID NOSWARMID;
196 
197     protected:
198         bool        empty_; // if NOSWARMID
199         transfer_t  ttype_;
200         Sha1Hash    roothash_;
201         SwarmPubKey spubkey_;
202     };
203 
204 
205 
206     /** IPv4/6 address, just a nice wrapping around struct sockaddr_storage. */
207     struct Address {
208         struct sockaddr_storage  addr;
209         Address();
210         Address(const char* ip, uint16_t port);
211         /**IPv4 address as "ip:port" or IPv6 address as "[ip]:port" following
212          * RFC2732, or just port in which case the address is set to in6addr_any */
213         Address(const char* ip_port);
214         Address(uint32_t ipv4addr, uint16_t port);
AddressAddress215         Address(const struct sockaddr_storage& address) : addr(address) {}
216         Address(struct in6_addr ipv6addr, uint16_t port);
217 
218         void set_ip(const char* ip_str, int family);
219         void set_port(uint16_t port);
220         void set_port(const char* port_str);
221         void set_ipv4(uint32_t ipv4);
222         void set_ipv4(const char* ipv4_str);
223         void set_ipv6(const char* ip_str);
224         void set_ipv6(struct in6_addr &ipv6);
225         void clear();
226         uint32_t ipv4() const;
227         struct in6_addr ipv6() const;
228         uint16_t port() const;
sockaddr_storageAddress229         operator sockaddr_storage() const {
230             return addr;
231         }
232         bool operator == (const Address& b) const;
233         std::string str() const;
234         std::string ipstr(bool includeport=false) const;
235         bool operator != (const Address& b) const {
236             return !(*this==b);
237         }
238         bool is_private() const;
get_familyAddress239         int get_family() const {
240             return addr.ss_family;
241         }
242         socklen_t get_family_sockaddr_length() const;
243     };
244 
245 
246 // Arno, 2011-10-03: Use libevent callback functions, no on_error?
247 #define sockcb_t        event_callback_fn
248     struct sckrwecb_t {
249         sckrwecb_t (evutil_socket_t s=0, sockcb_t mr=NULL, sockcb_t mw=NULL,
250                     sockcb_t oe=NULL) :
socksckrwecb_t251             sock(s), may_read(mr), may_write(mw), on_error(oe) {}
252         evutil_socket_t sock;
253         sockcb_t   may_read;
254         sockcb_t   may_write;
255         sockcb_t   on_error;
256     };
257 
258     struct now_t  {
259         static tint now;
260     };
261 
262 #define NOW now_t::now
263 
264     /** tintbin is basically a pair<tint,bin64_t> plus some nice operators.
265         Most frequently used in different queues (acknowledgements, requests,
266         etc). */
267     struct tintbin {
268         tint    time;
269         bin_t bin;
tintbintintbin270         tintbin(const tintbin& b) : time(b.time), bin(b.bin) {}
tintbintintbin271         tintbin() : time(TINT_NEVER), bin(bin_t::NONE) {}
tintbintintbin272         tintbin(tint time_, bin_t bin_) : time(time_), bin(bin_) {}
tintbintintbin273         tintbin(bin_t bin_) : time(NOW), bin(bin_) {}
274         bool operator < (const tintbin& b) const {
275             return time > b.time;
276         }
277         bool operator == (const tintbin& b) const {
278             return time==b.time && bin==b.bin;
279         }
280         bool operator != (const tintbin& b) const {
281             return !(*this==b);
282         }
283     };
284 
285     typedef std::deque<tintbin> tbqueue;
286     typedef std::deque<bin_t> binqueue;
287     typedef std::deque< std::pair<tint,tint> > ttqueue;
288     typedef Address   Address;
289 
290 
291     /** A heap (priority queue) for timestamped bin numbers (tintbins). */
292     class tbheap
293     {
294         tbqueue data_;
295     public:
size()296         int size() const {
297             return data_.size();
298         }
is_empty()299         bool is_empty() const {
300             return data_.empty();
301         }
pop()302         tintbin         pop() {
303             tintbin ret = data_.front();
304             std::pop_heap(data_.begin(),data_.end());
305             data_.pop_back();
306             return ret;
307         }
push(const tintbin & tb)308         void            push(const tintbin& tb) {
309             data_.push_back(tb);
310             push_heap(data_.begin(),data_.end());
311         }
peek()312         const tintbin&  peek() const {
313             return data_.front();
314         }
315     };
316 
317 
318     /** swift protocol message types; these are used on the wire. */
319     typedef enum {
320         SWIFT_HANDSHAKE = 0,
321         SWIFT_DATA = 1,
322         SWIFT_ACK = 2,
323         SWIFT_HAVE = 3,
324         SWIFT_INTEGRITY = 4,  // previously SWIFT_HASH
325         SWIFT_PEX_RESv4 = 5,    // previously SWIFT_PEX_ADD
326         SWIFT_PEX_REQ = 6,
327         SWIFT_SIGNED_INTEGRITY = 7, // previously SWIFT_SIGNED_HASH
328         SWIFT_REQUEST = 8,    // previously SWIFT_HINT
329         SWIFT_CANCEL = 9,
330         SWIFT_CHOKE = 10,
331         // SWIFT_RANDOMIZE = 10, //FRAGRAND disabled
332         SWIFT_UNCHOKE = 11,
333         SWIFT_PEX_RESv6 = 12,
334         SWIFT_PEX_REScert = 13,
335         SWIFT_MESSAGE_COUNT = 14
336     } messageid_t;
337 
338     typedef enum {
339         DDIR_UPLOAD,
340         DDIR_DOWNLOAD
341     } data_direction_t;
342 
343 
344     /** Arno: enum to indicate when to send an explicit close to the peer when
345      * doing a local close.
346      */
347     typedef enum {
348         CLOSE_DO_NOT_SEND,
349         CLOSE_SEND,
350         CLOSE_SEND_IF_ESTABLISHED,
351     } close_send_t;
352 
353 
354     typedef enum {
355         VER_SWIFT_LEGACY=0, //legacy swift
356         VER_PPSPP_v1=1      // IETF PPSPP compliant
357     } popt_version_t;
358 
359     // Protocol options defined by IETF PPSPP
360     typedef enum {
361         POPT_VERSION = 0,
362         POPT_MIN_VERSION = 1,
363         POPT_SWARMID = 2,
364         POPT_CONT_INT_PROT = 3,    // content integrity protection method
365         POPT_MERKLE_HASH_FUNC = 4,
366         POPT_LIVE_SIG_ALG = 5,
367         POPT_CHUNK_ADDR = 6,
368         POPT_LIVE_DISC_WND = 7,
369         POPT_SUPP_MSGS = 8,
370         POPT_END = 255
371     } popt_t;
372 
373     typedef enum {
374         POPT_CONT_INT_PROT_NONE = 0,
375         POPT_CONT_INT_PROT_MERKLE = 1,
376         POPT_CONT_INT_PROT_SIGNALL = 2,
377         POPT_CONT_INT_PROT_UNIFIED_MERKLE = 3
378     } popt_cont_int_prot_t;
379 
380     typedef enum {
381         POPT_MERKLE_HASH_FUNC_SHA1 = 0,
382         POPT_MERKLE_HASH_FUNC_SHA224 = 1,
383         POPT_MERKLE_HASH_FUNC_SHA256 = 2,
384         POPT_MERKLE_HASH_FUNC_SHA384 = 3,
385         POPT_MERKLE_HASH_FUNC_SHA512 = 4
386     } popt_merkle_func_t;
387 
388     typedef enum {
389         POPT_CHUNK_ADDR_BIN32 = 0,
390         POPT_CHUNK_ADDR_BYTE64 = 1,
391         POPT_CHUNK_ADDR_CHUNK32 = 2,
392         POPT_CHUNK_ADDR_BIN64 = 3,
393         POPT_CHUNK_ADDR_CHUNK64 = 4
394     } popt_chunk_addr_t;
395 
396     // popt_live_sig_alg_t: See livesig.h
397 
398 
399     class Handshake
400     {
401     public:
402 #if ENABLE_IETF_PPSP_VERSION == 1
Handshake()403         Handshake() : version_(VER_PPSPP_v1), min_version_(VER_PPSPP_v1), merkle_func_(POPT_MERKLE_HASH_FUNC_SHA1),
404             live_sig_alg_(DEFAULT_LIVE_SIG_ALG), chunk_addr_(POPT_CHUNK_ADDR_CHUNK32), live_disc_wnd_(POPT_LIVE_DISC_WND_ALL),
405             swarm_id_ptr_(NULL) {}
406 #else
407         Handshake() : version_(VER_SWIFT_LEGACY), min_version_(VER_SWIFT_LEGACY), merkle_func_(POPT_MERKLE_HASH_FUNC_SHA1),
408             live_sig_alg_(DEFAULT_LIVE_SIG_ALG), chunk_addr_(POPT_CHUNK_ADDR_BIN32), live_disc_wnd_(POPT_LIVE_DISC_WND_ALL),
409             swarm_id_ptr_(NULL) {}
410 #endif
Handshake(Handshake & c)411         Handshake(Handshake &c) {
412             version_ = c.version_;
413             min_version_ = c.min_version_;
414             cont_int_prot_ = c.cont_int_prot_;
415             merkle_func_ = c.merkle_func_;
416             live_sig_alg_ = c.live_sig_alg_;
417             chunk_addr_ = c.chunk_addr_;
418             live_disc_wnd_ = c.live_disc_wnd_;
419             if (c.swarm_id_ptr_ == NULL)
420                 swarm_id_ptr_ = NULL;
421             else
422                 swarm_id_ptr_ = new SwarmID(*(c.swarm_id_ptr_));
423 
424         }
~Handshake()425         ~Handshake() {
426             ReleaseSwarmID();
427         }
SetSwarmID(SwarmID & swarmid)428         void SetSwarmID(SwarmID &swarmid) {
429             swarm_id_ptr_ = new SwarmID(swarmid);
430         }
GetSwarmID()431         SwarmID GetSwarmID() {
432             return (swarm_id_ptr_ == NULL) ? SwarmID::NOSWARMID : *swarm_id_ptr_;
433         }
ReleaseSwarmID()434         void ReleaseSwarmID() {
435             if (swarm_id_ptr_ != NULL) delete swarm_id_ptr_;
436             swarm_id_ptr_ = NULL;
437         }
IsSupported()438         bool IsSupported() {
439             if (cont_int_prot_ == POPT_CONT_INT_PROT_SIGNALL)
440                 return false; // PPSPTODO
441             else if (merkle_func_ >= POPT_MERKLE_HASH_FUNC_SHA224)
442                 return false; // PPSPTODO
443             else if (chunk_addr_ == POPT_CHUNK_ADDR_BYTE64 || chunk_addr_ == POPT_CHUNK_ADDR_BIN64
444                      || chunk_addr_ == POPT_CHUNK_ADDR_CHUNK64)
445                 return false; // PPSPTODO
446             else if (!(live_sig_alg_ == POPT_LIVE_SIG_ALG_RSASHA1 || live_sig_alg_ == POPT_LIVE_SIG_ALG_ECDSAP256SHA256
447                        || live_sig_alg_ == POPT_LIVE_SIG_ALG_ECDSAP384SHA384))
448                 return false; // PPSPTODO
449             return true;
450         }
ResetToLegacy()451         void ResetToLegacy() {
452             // Do not reset peer_channel_id
453             version_ =       VER_SWIFT_LEGACY;
454             min_version_ =   VER_SWIFT_LEGACY;
455             cont_int_prot_ = POPT_CONT_INT_PROT_MERKLE;
456             merkle_func_ =   POPT_MERKLE_HASH_FUNC_SHA1;
457             live_sig_alg_ =  POPT_LIVE_SIG_ALG_PRIVATEDNS;
458             chunk_addr_ =    POPT_CHUNK_ADDR_BIN32;
459             live_disc_wnd_ = (uint32_t)POPT_LIVE_DISC_WND_ALL;
460             live_sig_alg_ =  DEFAULT_LIVE_SIG_ALG;
461         }
462 
463         /**    Peer channel id; zero if we are trying to open a channel. */
464         uint32_t             peer_channel_id_;
465         popt_version_t       version_;
466         popt_version_t       min_version_;
467         popt_cont_int_prot_t cont_int_prot_;
468         popt_merkle_func_t   merkle_func_;
469         popt_live_sig_alg_t  live_sig_alg_;
470         popt_chunk_addr_t    chunk_addr_;
471         uint64_t             live_disc_wnd_;
472     protected:
473         /** Dynamically allocated such that we can deallocate it and
474          * save some bytes per channel */
475         SwarmID              *swarm_id_ptr_;
476     };
477 
478     /** Arno, 2013-09-25: Currently just used for URI processing.
479      * Could be used as args to Open and LiveOpen in future.
480      */
481     class SwarmMeta
482     {
483     public:
SwarmMeta()484         SwarmMeta() : version_(VER_PPSPP_v1), min_version_(VER_PPSPP_v1), cont_int_prot_(POPT_CONT_INT_PROT_MERKLE),
485             merkle_func_(POPT_MERKLE_HASH_FUNC_SHA1),  live_sig_alg_(DEFAULT_LIVE_SIG_ALG), chunk_addr_(POPT_CHUNK_ADDR_CHUNK32),
486             live_disc_wnd_(POPT_LIVE_DISC_WND_ALL), injector_addr_(), chunk_size_(SWIFT_DEFAULT_CHUNK_SIZE), cont_dur_(0),
487             cont_len_(0), ext_tracker_url_(""), mime_type_("") {
488         }
489         popt_version_t      version_;
490         popt_version_t      min_version_;
491         popt_cont_int_prot_t    cont_int_prot_;
492         popt_merkle_func_t  merkle_func_;
493         popt_live_sig_alg_t live_sig_alg_; // UNUSED
494         popt_chunk_addr_t   chunk_addr_;
495         uint64_t        live_disc_wnd_;
496         Address         injector_addr_;
497         uint32_t        chunk_size_;
498         int32_t         cont_dur_;
499         uint64_t        cont_len_;
500         std::string     ext_tracker_url_;
501         std::string     mime_type_;
502     };
503 
504     typedef std::pair<std::string,std::string> stringpair;
505     typedef std::map<std::string,std::string>  parseduri_t;
506     bool ParseURI(std::string uri,parseduri_t &map);
507     std::string URIToSwarmMeta(parseduri_t &map, SwarmMeta *sm);
508 
509     class PiecePicker;
510     //class CongestionController; // Arno: Currently part of Channel. See ::NextSendTime()
511     class Channel;
512     typedef std::vector<Channel *>  channels_t;
513     typedef void (*ProgressCallback)(int td, bin_t bin);
514     typedef std::pair<ProgressCallback,uint8_t> progcallbackreg_t;
515     typedef std::vector<progcallbackreg_t> progcallbackregs_t;
516     typedef std::vector<int>        tdlist_t;
517     class Storage;
518 
519     /*
520      * Superclass for live and video-on-demand
521      */
522     class ContentTransfer : public Operational
523     {
524 
525     public:
526         ContentTransfer(transfer_t ttype);
527         virtual ~ContentTransfer();
528 
529         /** Returns the type of transfer, FILE_TRANSFER or LIVE_TRANSFER */
ttype()530         transfer_t      ttype() {
531             return ttype_;
532         }
533 
534         // Overridable methods
535         /** Returns the global ID for this transfer */
536         virtual SwarmID&     swarm_id() = 0;
537         /** The binmap pointer for data already retrieved and checked. */
538         virtual binmap_t *  ack_out() = 0;
539         /** Returns the number of bytes in a chunk for this transfer */
540         virtual uint32_t chunk_size() = 0;
541         /** Integrity protection via Hash tree */
hashtree()542         HashTree *      hashtree() {
543             return hashtree_;
544         }
545         /** Check whether all components still in working state */
546         virtual void    UpdateOperational() = 0;
547 
548         /** Piece picking strategy used by this transfer. */
picker()549         PiecePicker *   picker() {
550             return picker_;
551         }
552         /** Returns the local ID for this transfer. */
td()553         int             td() const {
554             return td_;
555         }
556         /** Sets the ID for this transfer post create (used by SwarmManager) */
557         void        SetTD(int td);
558         // Gertjan fix: return bool
559         bool            OnPexIn(const Address& addr);
560         // Gertjan
561         Channel *       RandomChannel(Channel *notc);
562         /** Arno: Return the Channel to peer "addr" that is not equal to "notc". */
563         Channel *       FindChannel(const Address &addr, Channel *notc);
564         void            CloseChannels(channels_t delset, bool isall); // do not pass by reference
565         void            GarbageCollectChannels();
566 
567         // RATELIMIT
568         /** Arno: Call when n bytes are received. */
569         void            OnRecvData(int n);
570         /** Arno: Call when n bytes are sent. */
571         void            OnSendData(int n);
572         /** Arno: Call when no bytes are sent due to rate limiting. */
573         void            OnSendNoData();
574         /** Ric:  Call when no bytes are received. */
575         void            OnRecvNoData();
576         /** Arno: Return current speed for the given direction in bytes/s */
577         double          GetCurrentSpeed(data_direction_t ddir);
578         /** Arno: Return maximum speed for the given direction in bytes/s */
579         double          GetMaxSpeed(data_direction_t ddir);
580         /** Arno: Set maximum speed for the given direction in bytes/s */
581         void            SetMaxSpeed(data_direction_t ddir, double m);
582         /** Arno: Return the number of non-seeders current channeled with. */
583         uint32_t        GetNumLeechers();
584         /** Arno: Return the number of seeders current channeled with. */
585         uint32_t        GetNumSeeders();
586 
587         /** Arno: Return (pointer to) the list of Channels for this transfer. MORESTATS */
GetChannels()588         channels_t *    GetChannels() {
589             return &mychannels_;
590         }
591         /** Arno: Return the list of callbacks for this transfer */
GetProgressCallbackRegistrations()592         progcallbackregs_t  GetProgressCallbackRegistrations() {
593             return callbacks_;
594         }
595 
596         // MULTIFILE
GetStorage()597         Storage *       GetStorage() {
598             return storage_;
599         }
600 
601         /** Add a peer to the set of addresses to connect to */
602         void            AddPeer(Address &peer);
SetDefaultHandshake(Handshake & default_hs_out)603         void        SetDefaultHandshake(Handshake &default_hs_out) {
604             def_hs_out_ = default_hs_out;
605         }
GetDefaultHandshake()606         Handshake & GetDefaultHandshake() {
607             return def_hs_out_;
608         }
609 
610         /** Ric: add number of hints for slow start scenario */
SetSlowStartHints(uint32_t hints)611         void            SetSlowStartHints(uint32_t hints) {
612             slow_start_hints_ += hints;
613         }
614         /** Ric: get the # of slow start hints */
GetSlowStartHints()615         uint32_t        GetSlowStartHints() {
616             return slow_start_hints_;
617         }
618 
619         /** Arno: set the tracker for this transfer. Reseting it won't kill
620          * any existing connections. */
SetTracker(std::string trackerurl)621         void            SetTracker(std::string trackerurl) {
622             trackerurl_ = trackerurl;
623         }
624         /** Arno: (Re)Connect to tracker for this transfer, or global Channel::trackerurl if not set */
625         void            ConnectToTracker(bool stop=false);
626         /** Arno: Reconnect to the tracker if no established peers or is connected
627          * to a live source that went silent and exp backoff allows it. */
628         void            ReConnectToTrackerIfAllowed(bool movingforward);
GetExternalTrackerClient()629         ExternalTrackerClient *GetExternalTrackerClient() {
630             return ext_tracker_client_;
631         }
632 
633         /** Progress callback management **/
634         void        AddProgressCallback(ProgressCallback cb, uint8_t agg);
635         void        RemoveProgressCallback(ProgressCallback cb);
636         void        Progress(bin_t bin);  /** Called by channels when data comes in */
637 
638         /** Arno: Callback to do maintenance for all transfers */
639         static void     LibeventGlobalCleanCallback(int fd, short event, void *arg);
640         static struct event evclean; // Global for all Transfers
641         static uint64_t cleancounter;
642 
643     protected:
644         transfer_t      ttype_;
645         SwarmID     swarm_id_;
646         int             td_;    // transfer descriptor as used by swift API.
647         Handshake   def_hs_out_;
648 
649         /** Channels working for this transfer. */
650         channels_t      mychannels_;
651 
652         /** Progress callback management **/
653         progcallbackregs_t callbacks_;
654 
655         /** Piece picker strategy. */
656         PiecePicker*    picker_;
657 
658         // RATELIMIT
659         MovingAverageSpeed    cur_speed_[2];
660         double          max_speed_[2];
661         uint32_t        speedupcount_;
662         uint32_t        speeddwcount_;
663         // MULTIFILE
664         Storage         *storage_;
665 
666         /** HashTree for transfer (either MmapHashTree, ZeroHashTree, LiveHashTree or NULL) */
667         HashTree*       hashtree_;
668 
669         std::string     trackerurl_; // Tracker URL for this transfer
670         tint            tracker_retry_interval_;
671         tint            tracker_retry_time_;
672         ExternalTrackerClient *ext_tracker_client_; // if external tracker
673         // Ric: slow start 4 requesting hints
674         uint32_t        slow_start_hints_;
675 
676     };
677 
678 
679     /** A class representing a file/VOD transfer of one or multiple files */
680     class    FileTransfer : public ContentTransfer
681     {
682 
683     public:
684         /** A constructor. Open/submit/retrieve a file.
685          *  @param file_name    the name of the file
686          *  @param root_hash    the root hash of the file; zero hash if the file
687         *                      is newly submitted
688          *  @param force_check_diskvshash whether to force a check of disk versus hashes
689          *  @param check_netwvshash   whether to hash check chunk on receipt
690          *  @param chunk_size size of chunk to use
691          *  @param zerostate  whether to serve the hashes + content directly from disk
692          */
693         FileTransfer(int td, std::string file_name, const Sha1Hash& root_hash=Sha1Hash::ZERO, bool force_check_diskvshash=true,
694                      popt_cont_int_prot_t cipm=POPT_CONT_INT_PROT_MERKLE, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE, bool zerostate=false,
695                      std::string metadir="");
696         /**    Close everything. */
697         ~FileTransfer();
698 
699         // ContentTransfer overrides
700 
swarm_id()701         SwarmID& swarm_id() {
702             swarm_id_.SetRootHash(hashtree_->root_hash());
703             return swarm_id_;
704         }
705         /** The binmap pointer for data already retrieved and checked. */
ack_out()706         binmap_t *      ack_out()  {
707             return hashtree_->ack_out();
708         }
709         /** Piece picking strategy used by this transfer. */
chunk_size()710         uint32_t    chunk_size() {
711             return hashtree_->chunk_size();
712         }
713         /** Check whether all components still in working state */
714         void        UpdateOperational();
715 
716         // FileTransfer specific methods
717 
718         /** Ric: the availability in the swarm */
availability()719         Availability*   availability() {
720             return availability_;
721         }
722         //ZEROSTATE
723         /** Returns whether this FileTransfer is running in zero-state mode,
724          * meaning that the hash tree is not mmapped into memory but read
725          * directly from disk, and other memory saving measures.
726          */
IsZeroState()727         bool        IsZeroState() {
728             return zerostate_;
729         }
730 
731     protected:
732         // Ric: PPPLUG
733         /** Availability in the swarm */
734         Availability*   availability_;
735 
736         //ZEROSTATE
737         bool            zerostate_;
738     };
739 
740     /** A class representing a live transfer. */
741     class    LiveTransfer : public ContentTransfer
742     {
743     public:
744 
745         /** A constructor for a live source. */
746         LiveTransfer(std::string filename, KeyPair &keypair, std::string checkpoint_filename, popt_cont_int_prot_t cipm,
747                      uint64_t disc_wnd=POPT_LIVE_DISC_WND_ALL, uint32_t nchunks_per_sign=SWIFT_DEFAULT_LIVE_NCHUNKS_PER_SIGN,
748                      uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
749 
750         /** A constructor for live client. */
751         LiveTransfer(std::string filename, SwarmID &swarmid, Address &srcaddr, popt_cont_int_prot_t cipm,
752                      uint64_t disc_wnd=POPT_LIVE_DISC_WND_ALL, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
753 
754         /**  Close everything. */
755         ~LiveTransfer();
756 
757         // ContentTransfer overrides
758 
swarm_id()759         SwarmID&  swarm_id() {
760             return swarm_id_;
761         }
762         /** The binmap for data already retrieved and checked. */
763         binmap_t *      ack_out();
764         /** Returns the number of bytes in a chunk for this transmission */
chunk_size()765         uint32_t        chunk_size() {
766             return chunk_size_;
767         }
768         /** Check whether all components still in working state */
769         void        UpdateOperational();
770 
771         // LiveTransfer specific methods
772 
773         /** Returns the number of bytes that are complete sequentially, starting from the
774              hookin offset, till the first not-yet-retrieved packet. */
775         uint64_t        SeqComplete();
776 
777         /** Returns whether this transfer is the source */
am_source()778         bool            am_source() {
779             return am_source_;
780         }
781 
782         /** Source: add a chunk to the swarm */
783         int             AddData(const void *buf, uint32_t nbyte);
784 
785         /** Source: announce only chunks under signed munros */
786         void            UpdateSignedAckOut();
787 
788 
789         /** Returns the byte offset at which we hooked into the live stream */
790         uint64_t        GetHookinOffset();
791 
792         /** Source: Returns current write position in storage file */
GetSourceWriteOffset()793         uint64_t        GetSourceWriteOffset() {
794             return offset_;
795         }
796 
797         // SIGNPEAK
798         /** Source: Return all chunks in ack_out_ covered by peaks */
799         binmap_t *      ack_out_signed();
800 
801         /** Received a correctly signed munro hash with timestamp sourcet */
802         void        OnVerifiedMunroHash(bin_t munro, tint sourcet);
803 
804         /** If live discard window is used, purge unused parts of tree.
805          * pos is last received chunk. */
806         void            OnDataPruneTree(Handshake &hs_out, bin_t pos, uint32_t nchunks2forget);
807 
808         // Arno: FileTransfers are managed by the SwarmManager which
809         // activates/deactivates them as required. LiveTransfers are unmanaged.
810         /** Find transfer by the transfer descriptor. */
811         static LiveTransfer* FindByTD(int td);
812         /** Find transfer by the swarm ID. */
813         static LiveTransfer* FindBySwarmID(const SwarmID& swarmid);
814         /** Return list of transfer descriptors of all LiveTransfers */
815         static tdlist_t GetTransferDescriptors();
816         /** Add this LiveTransfer to the global list */
817         void GlobalAdd();
818         /** Remove this LiveTransfer for the global list */
819         void GlobalDel();
820 
821         // LIVECHECKPOINT
822         int WriteCheckpoint(BinHashSigTuple &roottup);
823         BinHashSigTuple ReadCheckpoint();
824 
825         /** Source: returns current last_chunkid_ as bin */
826         bin_t       GetSourceCurrentPos();
827 
828         /** Client: return source address */
GetSourceAddress()829         Address     GetSourceAddress() {
830             return srcaddr_;
831         }
832 
833     protected:
834         /**    Binmap of own chunk availability, when not POPT_CONT_INT_PROT_UNIFIED_MERKLE (so _NONE or _SIGNALL) */
835         binmap_t        ack_out_;
836         /**    Binmap of own chunk availability restricted to current signed peaks SIGNPEAK */
837         binmap_t    signed_ack_out_;
838         /**    Bin of right-most received chunk LIVE */
839         bin_t       ack_out_right_basebin_; // FUTURE: make part of binmap_t?
840         // CHUNKSIZE
841         /** Arno: configurable fixed chunk size in bytes */
842         uint32_t         chunk_size_;
843 
844         /** Source: Am I a source */
845         bool            am_source_;
846         /** Name of file used for storing live chunks */
847         std::string     filename_;
848         /** Source: ID of last generated chunk */
849         uint64_t        last_chunkid_;
850         /** Source: Current write position in storage file */
851         uint64_t        offset_;
852 
853         /** Source: Count of chunks generated since last signed peak epoch */
854         uint32_t        chunks_since_sign_;
855 
856         // LIVECHECKPOINT
857         /** Filename to store source checkpoint */
858         std::string checkpoint_filename_;
859         /** bin of old tree, which becomes new peak but must announced as
860          * being in possession to others */
861         bin_t       checkpoint_bin_;
862 
863         /** Client: Source address for chunk picker and protocol optimization */
864         Address     srcaddr_;
865 
866         /** Arno: global list of LiveTransfers, which are not managed via SwarmManager */
867         static std::vector<LiveTransfer*> liveswarms;
868 
869         /** Joint constructor code between source and client */
870         void Initialize(KeyPair &keypair, popt_cont_int_prot_t cipm, uint64_t disc_wnd,uint32_t nchunks_per_sign);
871     };
872 
873 
874     /** PiecePicker implements some strategy of choosing (picking) what
875         to request next, given the possible range of choices:
876         data acknowledged by the peer minus data already retrieved.
877         May pick sequentially, do rarest first or in some other way. */
878     class PiecePicker
879     {
880     public:
881         virtual void    Randomize(uint64_t twist) = 0;
882         /** The piece picking method itself.
883          *  @param  offered     the data acknowledged by the peer
884          *  @param  max_width   maximum number of packets to ask for
885          *  @param  expires     (not used currently) when to consider request expired
886          *  @return             the bin number to request */
887         virtual bin_t   Pick(binmap_t& offered, uint64_t max_width, tint expires, uint32_t channelid) = 0;
888         virtual void    LimitRange(bin_t range) = 0;
889         /** updates the playback position for streaming piece picking.
890          *  @param  offbin        bin number of new playback pos
891          *  @param  whence      only SEEK_CUR supported */
892         virtual int     Seek(bin_t offbin, int whence) = 0;
~PiecePicker()893         virtual         ~PiecePicker() {}
894     };
895 
896     class LivePiecePicker : public PiecePicker
897     {
898     public:
899         /** Arno: Register the last munro sent by a peer, to be able to choose
900          * a hook-in point. */
901         virtual void    AddPeerMunro(bin_t munro, tint sourcet) = 0;
902         /** Returns the bin at which we hooked into the live stream. */
903         virtual bin_t   GetHookinPos() = 0;
904         /** Returns the bin in the live stream we currently want to download. */
905         virtual bin_t   GetCurrentPos() = 0;
906     };
907 
908 
909     /**    swift channel's "control block"; channels loosely correspond to TCP
910        connections or FTP sessions; one channel is created for one file
911        being transferred between two peers. As we don't need buffers and
912        lots of other TCP stuff, sizeof(Channel+members) must be below 1K.
913        Normally, API users do not deal with this class. */
914     class Channel
915     {
916 
917     public:
918         Channel(ContentTransfer* transfer, int socket=INVALID_SOCKET, Address peer=Address());
919         ~Channel();
920 
921         typedef enum {
922             KEEP_ALIVE_CONTROL,
923             PING_PONG_CONTROL,
924             SLOW_START_CONTROL,
925             AIMD_CONTROL,
926             LEDBAT_CONTROL,
927             CLOSE_CONTROL
928         } send_control_t;
929 
930 
931         typedef enum {
932             NONE,
933             NOTHING_TO_SEND,
934             NOTHIG_RECEIVED,
935             PING_PONG_NO_RESPONSE,
936             LONG_SEND_INTERVAL
937         } send_control_reason_t;
938 
939 #define DGRAM_MAX_SOCK_OPEN 128
940         static int sock_count;
941         static sckrwecb_t sock_open[DGRAM_MAX_SOCK_OPEN];
942         static std::string  trackerurl; // Global tracker for all transfers
943         static struct event_base *evbase;
944         static struct event evrecv;
945         static const char* SEND_CONTROL_MODES[];
946 
947         static tint     epoch, start;
948         static uint64_t global_dgrams_up, global_dgrams_down, global_raw_bytes_up, global_raw_bytes_down, global_bytes_up,
949                global_bytes_down;
950         static void     CloseChannelByAddress(const Address &addr);
951 
952         // SOCKMGMT
953         // Arno: channel is also a "singleton" class that manages all sockets
954         // for a swift process
955         static void     LibeventSendCallback(int fd, short event, void *arg);
956         static void     LibeventReceiveCallback(int fd, short event, void *arg);
957         static void     RecvDatagram(evutil_socket_t socket);  // Called by LibeventReceiveCallback
958         static int      RecvFrom(evutil_socket_t sock, Address& addr, struct evbuffer *evb); // Called by RecvDatagram
959         static int      SendTo(evutil_socket_t sock, const Address& addr, struct evbuffer *evb); // Called by Channel::Send()
960         static evutil_socket_t Bind(Address address, sckrwecb_t callbacks=sckrwecb_t());
961         static Address  BoundAddress(evutil_socket_t sock);
default_socket()962         static evutil_socket_t default_socket() {
963             return sock_count ? sock_open[0].sock : INVALID_SOCKET;
964         }
965 
966         /** close the port */
967         static void     CloseSocket(evutil_socket_t sock);
968         static void     Shutdown();
969         /** the current time */
970         static tint     Time();
971         static tint     last_tick;
972         // Arno: send explicit close outside Channel context
973         static void     StaticSendClose(evutil_socket_t socket,Address &addr, uint32_t peer_channel_id);
974 
975         // Ric: used for testing LEDBAT's behaviour
GetCwnd()976         float       GetCwnd() {
977             return cwnd_;
978         }
GetHintSize(data_direction_t ddir)979         uint64_t    GetHintSize(data_direction_t ddir) {
980             return ddir ? hint_out_size_ : hint_in_size_;
981         }
982         bool        Totest;
983         bool        Tocancel;
984 
985         // Arno: Per instance methods
986         void        Recv(struct evbuffer *evb);
987         void        Send();   // Called by LibeventSendCallback
988         void        Close(close_send_t closesend);
ClearTransfer()989         void        ClearTransfer() {
990             transfer_ = NULL;    // for swarm cleanup
991         }
992 
993         void        OnAck(struct evbuffer *evb);
994         void        OnHave(struct evbuffer *evb);
995         void        OnHaveLive(bin_t ackd_pos);
996         bin_t       OnData(struct evbuffer *evb);
997         void        OnHint(struct evbuffer *evb);
998         void        OnHash(struct evbuffer *evb);
999         void        OnPexAdd(struct evbuffer *evb, int family);
1000         void        OnPexAddCert(struct evbuffer *evb);
1001         static Handshake *StaticOnHandshake(Address &addr, uint32_t cid, bool ver_known, popt_version_t ver,
1002                                             struct evbuffer *evb);
1003         void        OnHandshake(Handshake *hishs);
1004         void        OnCancel(struct evbuffer *evb); // PPSP
1005         void        OnChoke(struct evbuffer *evb);
1006         void        OnUnchoke(struct evbuffer *evb);
1007         void        OnSignedHash(struct evbuffer *evb);
1008         void        AddHandshake(struct evbuffer *evb);
1009         bin_t       AddData(struct evbuffer *evb);
1010         void        SendIfTooBig(struct evbuffer *evb);
1011         void        AddAck(struct evbuffer *evb);
1012         void        AddHave(struct evbuffer *evb);
1013         void        AddHint(struct evbuffer *evb);
1014         void        AddCancel(struct evbuffer *evb);
1015         void        AddRequiredHashes(struct evbuffer *evb, bin_t pos, bool isretransmit);
1016         void        AddUnsignedPeakHashes(struct evbuffer *evb);
1017         void        AddFileUncleHashes(struct evbuffer *evb, bin_t pos);
1018         void        AddLiveSignedMunroHash(struct evbuffer *evb,bin_t munro); // SIGNMUNRO
1019         void        AddLiveUncleHashes(struct evbuffer *evb, bin_t pos, bin_t munro, bool isretransmit);  // SIGNMUNRO
1020         void        AddPex(struct evbuffer *evb);
1021         void        OnPexReq(void);
1022         void        AddPexReq(struct evbuffer *evb);
1023         void        BackOffOnLosses(float ratio=0.5);
1024         tint        SwitchSendControl(send_control_t control_mode);
1025         tint        NextSendTime();
1026         tint        KeepAliveNextSendTime();
1027         tint        PingPongNextSendTime();
1028         tint        CwndRateNextSendTime();
1029         tint        SlowStartNextSendTime();
1030         tint        AimdNextSendTime();
1031         tint        LedbatNextSendTime();
1032         /** Arno: return true if this peer has complete file. May be fuzzy if Peak Hashes not in */
1033         bool        IsComplete();
1034         /** Arno: return (UDP) port for this channel */
1035         uint16_t    GetMyPort();
1036         bool        IsDiffSenderOrDuplicate(Address addr, uint32_t chid);
1037 
1038         static int  MAX_REORDERING;
1039         static tint TIMEOUT;
1040         static tint MIN_DEV;
1041         static tint MAX_SEND_INTERVAL;
1042         static tint LEDBAT_TARGET;
1043         static float LEDBAT_GAIN;
1044         static tint LEDBAT_DELAY_BIN;
1045         static uint32_t LEDBAT_BASE_HISTORY;
1046         static uint32_t LEDBAT_ROLLOVER;
1047         static bool SELF_CONN_OK;
1048         static tint MAX_POSSIBLE_RTT;
1049         static tint MIN_PEX_REQUEST_INTERVAL;
1050         static FILE* debug_file;
1051         // Only in devel: file used to debug LEDBAT
1052         static FILE* debug_ledbat;
1053 
1054         const std::string id_string() const;
1055         /** A channel is "established" if had already sent and received packets. */
1056         bool        is_established();
1057         HashTree *  hashtree();
transfer()1058         ContentTransfer *transfer() {
1059             return transfer_;
1060         }
peer()1061         const Address& peer() const {
1062             return peer_;
1063         }
recv_peer()1064         const Address& recv_peer() const {
1065             return recv_peer_;
1066         }
ack_timeout()1067         tint        ack_timeout() {
1068             tint dev = dev_avg_ < MIN_DEV ? MIN_DEV : dev_avg_;
1069             tint tmo = rtt_avg_ + dev * 4;
1070             return tmo < 30*TINT_SEC ? tmo : 30*TINT_SEC;
1071         }
id()1072         uint32_t    id() const {
1073             return id_;
1074         }
ack_in()1075         const binmap_t& ack_in() const {
1076             return ack_in_;
1077         }
1078 
1079         // MORESTATS
raw_bytes_up()1080         uint64_t    raw_bytes_up() {
1081             return raw_bytes_up_;
1082         }
raw_bytes_down()1083         uint64_t    raw_bytes_down() {
1084             return raw_bytes_down_;
1085         }
bytes_up()1086         uint64_t    bytes_up() {
1087             return bytes_up_;
1088         }
bytes_down()1089         uint64_t    bytes_down() {
1090             return bytes_down_;
1091         }
1092 
1093         static int  DecodeID(int scrambled);
1094         static int  EncodeID(int unscrambled);
channel(int i)1095         static Channel* channel(int i) {
1096             return i<channels.size()?channels[i]:NULL;
1097         }
1098 
1099         // SAFECLOSE
1100         void        ClearEvents();
Schedule4Delete()1101         void        Schedule4Delete() {
1102             scheduled4del_ = true;
1103         }
IsScheduled4Delete()1104         bool        IsScheduled4Delete() {
1105             return scheduled4del_;
1106         }
1107 
1108         //ZEROSTATE
1109         // Message handler replacements
1110         void        OnDataZeroState(struct evbuffer *evb);
1111         void        OnHaveZeroState(struct evbuffer *evb);
1112         void        OnHashZeroState(struct evbuffer *evb);
1113         void        OnPexAddZeroState(struct evbuffer *evb, int family);
1114         void        OnPexAddCertZeroState(struct evbuffer *evb);
1115         void        OnPexReqZeroState(struct evbuffer *evb);
GetOpenTime()1116         tint        GetOpenTime() {
1117             return open_time_;
1118         }
1119 
1120         // LIVE
1121         /** Arno: Called when source generates chunk. */
1122         void        LiveSend();
1123         bool        PeerIsSource();
GetLastRecvTime()1124         tint        GetLastRecvTime() {
1125             return last_recv_time_;
1126         }
1127 
1128         void        CloseOnError();
1129 
1130         // MOVINGFWD
1131         /** Whether or not channel is uploading when seeder, or downloading when leecher */
1132         bool        IsMovingForward();
1133 
1134     protected:
1135         struct event    *evsend_ptr_; // Arno: timer per channel // SAFECLOSE
1136         //LIVE
1137         struct event    *evsendlive_ptr_; // Arno: timer per channel
1138 
1139         /** Channel id: index in the channel array. */
1140         uint32_t    id_;
1141         /**    Socket address of the peer. */
1142         Address     peer_;
1143         /**    The UDP socket fd. */
1144         evutil_socket_t      socket_;
1145         /**    Descriptor of the file in question. */
1146         ContentTransfer*    transfer_;
1147         bool        own_id_mentioned_;
1148         /**    Peer's progress, based on acknowledgements. */
1149         binmap_t    ack_in_;
1150         /**    Bin of right-most acked chunk LIVE */
1151         bin_t       ack_in_right_basebin_; // FUTURE: make part of binmap_t?
1152 
1153         /**    Last data received; needs to be acked immediately. */
1154         tintbin     data_in_;
1155         bin_t       data_in_dbl_;
1156         /** The history of data sent and still unacknowledged. */
1157         tbqueue     data_out_;
1158         uint32_t    data_out_size_; // pkts not acknowledged
1159         /** Timeouted data (potentially to be retransmitted). */
1160         tbqueue     data_out_tmo_; // it contains only leaf bins
1161         bin_t       data_out_cap_; // Ric: maybe we should remove it.. creates problems if lost
1162         /** Index in the history array. */
1163         binmap_t    have_out_;
1164         /**    Transmit schedule: in most cases filled with the peer's hints */
1165         tbqueue     hint_in_;
1166         uint64_t    hint_in_size_;
1167         /** Hints sent (to detect and reschedule ignored hints). */
1168         tbqueue     hint_out_;
1169         uint64_t    hint_out_size_;
1170         /** Hints queued to be sent. */
1171         tbqueue     hint_queue_out_;
1172         uint64_t    hint_queue_out_size_;
1173         /** Ric: hints that are removed from the hint_out_ queue and need to be canceled */
1174         std::deque<bin_t> cancel_out_;
1175         /** Types of messages the peer accepts. */
1176         uint64_t    cap_in_;
1177         /** PEX progress */
1178         bool        pex_requested_;
1179         tint        last_pex_request_time_;
1180         tint        next_pex_request_time_;
1181         bool        pex_request_outstanding_;
1182         tbqueue
1183         reverse_pex_out_;        // Arno, 2011-10-03: should really be a queue of (tint,channel id(= uint32_t)) pairs.
1184         int         useless_pex_count_;
1185         /** Smoothed averages for RTT, RTT deviation and data interarrival periods. */
1186         tint        rtt_avg_, dev_avg_, dip_avg_;
1187         tint        last_send_time_;
1188         tint        last_recv_time_;
1189         tint        last_data_out_time_;
1190         tint        last_data_in_time_;
1191         tint        last_loss_time_;
1192         tint        next_send_time_;
1193         tint        open_time_;
1194         /** Congestion window; TODO: int, bytes. */
1195         float       cwnd_;
1196         int         cwnd_count1_;
1197         /** Data sending interval. */
1198         tint        send_interval_;
1199         /** The congestion control strategy. */
1200         send_control_t         send_control_;
1201         /** Datagrams (not data) sent since last recv.    */
1202         int         sent_since_recv_;
1203 
1204         /** Arno: Fix for KEEP_ALIVE_CONTROL */
1205         bool        lastrecvwaskeepalive_;
1206         bool        lastsendwaskeepalive_;
1207         send_control_reason_t keepalivereason_;
1208         /** Arno: For live, we may receive a HAVE but have no hints
1209             outstanding. In that case we should not wait till next_send_time_
1210             but request directly. See send_control.cpp */
1211         bool        live_have_no_hint_;
1212 
1213         /** Recent acknowlegements for data previously sent. */
1214         int         ack_rcvd_recent_; // Arno, 2013-07-01: appears broken at the moment
1215         /** Recent non-acknowlegements (losses) of data previously sent.    */
1216         int         ack_not_rcvd_recent_;
1217         /** LEDBAT one-way delay machinery */
1218         tint        owd_min_bins_[10];
1219         int         owd_min_bin_;
1220         tint        owd_min_bin_start_;
1221         tint        owd_cur_;
1222         tint        owd_min_;
1223         /** LEDBAT current delay list should be > 4 && == RTT */
1224         ttqueue     owd_current_;
1225         ttqueue     dip_list_; // Ric: a list of dip values for smoothed avg
1226         /** Stats */
1227         int         dgrams_sent_;
1228         int         dgrams_rcvd_;
1229         // Arno, 2011-11-28: for detailed, per-peer stats. MORESTATS
1230         uint64_t    raw_bytes_up_, raw_bytes_down_, bytes_up_, bytes_down_;
1231         uint64_t    old_movingfwd_bytes_;
1232 
1233         // SAFECLOSE
1234         bool        scheduled4del_;
1235         /** Arno: Socket address of the peer where packets are received from,
1236          * when an IANA private address, otherwise 0.
1237          * May not be equal to peer_. 2PEERSBEHINDSAMENAT */
1238         Address     recv_peer_;
1239 
1240         // keep memory of previous delays
1241         tint        reschedule_delay_;
1242 
1243         // PPSP
1244         /** Handshake I sent to peer. swarmid not set. */
1245         Handshake   *hs_out_;
1246         /** Handshake I got from peer. */
1247         Handshake   *hs_in_;
1248 
1249         // SIGNMUNRO
1250         bin_t       last_sent_munro_;
1251         bool        munro_ack_rcvd_;
1252 
1253         // RTTCS
1254         tintbin     rtt_hint_tintbin_;
1255 
PeerBPS()1256         int         PeerBPS() const {
1257             return TINT_SEC / dip_avg_ * 1024;
1258         }
1259         /** Get a request for one packet from the queue of peer's requests. */
1260         bin_t       DequeueHint(bool *retransmitptr);
1261         bin_t       ImposeHint();
1262         void        TimeoutDataOut();
1263         void        CleanStaleHintOut();
1264         void        CleanHintOut(bin_t pos);
1265         void        Reschedule();
1266         void        UpdateDIP(bin_t pos); // RETRANSMIT
1267         void        UpdateRTT(tint owd);
1268 
1269         bin_t       DequeueHintOut(uint64_t size);
1270 
1271         // Arno, 2012-06-14: Replace with hashtable (unsorted_map). This
1272         // currently grows for ever, filling with NULLs for old channels
1273         // and results in channel IDs with are not really random.
1274         //
1275         static channels_t channels;
1276     };
1277 
1278 
1279     // MULTIFILE
1280     /*
1281      * Class representing a single file in a multi-file swarm.
1282      */
1283     class StorageFile : public Operational
1284     {
1285     public:
1286         StorageFile(std::string specpath, int64_t start, int64_t size, std::string ospath);
1287         ~StorageFile();
GetStart()1288         int64_t GetStart() {
1289             return start_;
1290         }
GetEnd()1291         int64_t GetEnd() {
1292             return end_;
1293         }
GetSize()1294         int64_t GetSize() {
1295             return end_+1-start_;
1296         }
GetSpecPathName()1297         std::string GetSpecPathName() {
1298             return spec_pathname_;
1299         }
GetOSPathName()1300         std::string GetOSPathName() {
1301             return os_pathname_;
1302         }
Write(const void * buf,size_t nbyte,int64_t offset)1303         ssize_t  Write(const void *buf, size_t nbyte, int64_t offset) {
1304             return pwrite(fd_,buf,nbyte,offset);
1305         }
Read(void * buf,size_t nbyte,int64_t offset)1306         ssize_t  Read(void *buf, size_t nbyte, int64_t offset) {
1307             return pread(fd_,buf,nbyte,offset);
1308         }
ResizeReserved()1309         int ResizeReserved() {
1310             return file_resize(fd_,GetSize());
1311         }
1312 
1313     protected:
1314         std::string spec_pathname_;
1315         std::string os_pathname_;
1316         int64_t     start_;
1317         int64_t     end_;
1318 
1319         int         fd_; // actual fd
1320     };
1321 
1322     typedef std::vector<StorageFile *>    storage_files_t;
1323 
1324     /*
1325      * Class representing the persistent storage layer. Supports a swarm
1326      * stored as multiple files.
1327      *
1328      * This is implemented by storing a multi-file specification in chunk 0
1329      * (and further if needed). This spec lists what other files the swarm
1330      * contains and their sizes. E.g.
1331      *
1332      * META-INF-multifilespec.txt 113
1333      * seeder/190557.ts 249798796
1334      * seeder/berta.dat 2395920988
1335      * seeder/bunny.ogg 166825767
1336      *
1337      * The concatenation of these files (starting with the multi-file spec with
1338      * pseudo filename META-INF-multifile-spec.txt) are the contents of the
1339      * swarm.
1340      */
1341     class Storage : public Operational
1342     {
1343 
1344     public:
1345 
1346         static const std::string MULTIFILE_PATHNAME;
1347         static const std::string MULTIFILE_PATHNAME_FILE_SEP;
1348         static const int         MULTIFILE_MAX_PATH = 2048;
1349         static const int         MULTIFILE_MAX_LINE = MULTIFILE_MAX_PATH+1+32+1;
1350 
1351         typedef enum {
1352             STOR_STATE_INIT,
1353             STOR_STATE_MFSPEC_SIZE_KNOWN,
1354             STOR_STATE_MFSPEC_COMPLETE,
1355             STOR_STATE_SINGLE_FILE,
1356             STOR_STATE_SINGLE_LIVE_WRAP  // single file containing just live discard window
1357         } storage_state_t;
1358 
1359         /** StorageFile for every file in this transfer */
1360         typedef std::vector<StorageFile *>    storage_files_t;
1361 
1362         /** convert multi-file spec filename (UTF-8 encoded Unicode) to OS name and vv. */
1363         static std::string spec2ospn(std::string specpn);
1364         static std::string os2specpn(std::string ospn);
1365 
1366         /** Create Storage from specified path and destination dir if content turns about to be a multi-file.
1367          * If live_disc_wnd_bytes !=0 then live single-file, wrapping if != POPT_LIVE_DISC_WND_ALL */
1368         Storage(std::string ospathname, std::string destdir, int td, uint64_t live_disc_wnd_bytes,
1369                 std::string metamfspecospathname="");
1370         ~Storage();
1371 
1372         /** UNIX pread approximation. Does change file pointer. Thread-safe if no concurrent writes */
1373         ssize_t     Read(void *buf, size_t nbyte, int64_t offset); // off_t not 64-bit dynamically on Win32
1374 
1375         /** UNIX pwrite approximation. Does change file pointer. Is not thread-safe */
1376         ssize_t     Write(const void *buf, size_t nbyte, int64_t offset);
1377 
1378         /** Link to HashTree */
SetHashTree(HashTree * ht)1379         void        SetHashTree(HashTree *ht) {
1380             ht_ = ht;
1381         }
1382 
1383         /** Size of content according to multi-file spec, -1 if unknown or single file */
1384         int64_t     GetSizeFromSpec();
1385 
1386         /** Size reserved for storage */
1387         int64_t     GetReservedSize();
1388 
1389         /** 0 for single file, spec size for multi-file */
1390         int64_t     GetMinimalReservedSize();
1391 
1392         /** Change size reserved for storage */
1393         int         ResizeReserved(int64_t size);
1394 
1395         /** Return the operating system path for this Storage */
GetOSPathName()1396         std::string GetOSPathName() {
1397             return os_pathname_;
1398         }
1399 
1400         /** Return the root hash of the content being stored */
roothashhex()1401         std::string roothashhex() {
1402             if (ht_ == NULL) return "0000000000000000000000000000000000000000";
1403             else return ht_->root_hash().hex();
1404         }
1405 
1406         /** Return the destination directory for this Storage */
GetDestDir()1407         std::string GetDestDir() {
1408             return destdir_;
1409         }
1410 
1411         /** Whether Storage is ready to be used */
IsReady()1412         bool        IsReady() {
1413             return state_ == STOR_STATE_SINGLE_FILE || STOR_STATE_SINGLE_LIVE_WRAP || state_ == STOR_STATE_MFSPEC_COMPLETE;
1414         }
1415 
1416         /** Return the list of StorageFiles for this Storage, empty if not multi-file */
GetStorageFiles()1417         storage_files_t    GetStorageFiles() {
1418             return sfs_;
1419         }
1420 
1421         /** Return a one-time callback when swift starts allocating disk space */
AddOneTimeAllocationCallback(ProgressCallback cb)1422         void AddOneTimeAllocationCallback(ProgressCallback cb) {
1423             alloc_cb_ = cb;
1424         }
1425 
1426         /** Sets the transfer descriptor for this storage obj post create (used by SwarmManager) */
SetTD(int td)1427         void SetTD(int td) {
1428             td_ = td;
1429         }
1430 
1431 
1432     protected:
1433         storage_state_t    state_;
1434 
1435         std::string os_pathname_;
1436         std::string destdir_;
1437 
1438         /** HashTree this Storage is linked to */
1439         HashTree    *ht_;
1440 
1441         int64_t     spec_size_;
1442 
1443         storage_files_t    sfs_;
1444         int         single_fd_;
1445         int64_t     reserved_size_;
1446         int64_t     total_size_from_spec_;
1447         StorageFile *last_sf_;
1448 
1449         int         td_; // transfer ID of the *Transfer we're part of.
1450         ProgressCallback alloc_cb_;
1451         uint64_t    live_disc_wnd_bytes_;
1452 
1453         std::string meta_mfspec_os_pathname_; // metadata might be located in a different dir
1454 
1455         int         WriteSpecPart(StorageFile *sf, const void *buf, size_t nbyte, int64_t offset);
1456         std::pair<int64_t,int64_t> WriteBuffer(StorageFile *sf, const void *buf, size_t nbyte, int64_t offset);
1457         StorageFile * FindStorageFile(int64_t offset);
1458         int         ParseSpec(StorageFile *sf);
1459         int         OpenSingleFile();
1460 
1461     };
1462 
1463     /*
1464      * Manager for starting on-demand transfers that serve content and hashes
1465      * directly from disk (so little state in memory). Requires content (named
1466      * as roothash-in-hex), hashes (roothash-in-hex.mhash file) and checkpoint
1467      * (roothash-in-hex.mbinmap) to be present on disk.
1468      */
1469     class ZeroState
1470     {
1471     public:
1472         ZeroState();
1473         ~ZeroState();
1474         static ZeroState *GetInstance();
1475         void SetContentDir(std::string contentdir);
1476         void SetMetaDir(std::string metadir);
1477         void SetConnectTimeout(tint timeout);
1478         int Find(const Sha1Hash &root_hash);
1479 
1480         static void LibeventCleanCallback(int fd, short event, void *arg);
1481 
1482     protected:
1483         static ZeroState *__singleton;
1484 
1485         struct event    evclean_;
1486         std::string     contentdir_;
1487         std::string     metadir_;
1488 
1489         /* Arno, 2012-07-20: A very slow peer can keep a transfer alive
1490           for a long time (3 minute channel close timeout not reached).
1491           This causes problems on Mac where there are just 256 file
1492           descriptors per process and this problem causes all of them
1493           to be used.
1494         */
1495         tint        connect_timeout_;
1496     };
1497 
1498 
1499     /*************** The top-level API ****************/
1500     // See api.cpp for the implementation.
1501     /** Must be called by any client using the library */
1502     void    LibraryInit(void);
1503 
1504     /** Start listening a port. Returns socket descriptor. */
1505     int     Listen(Address addr);
1506     /** Stop listening to a port. */
1507     /** Get the address bound to the socket descriptor returned by Listen() */
1508     Address BoundAddress(evutil_socket_t sock);
1509     void    Shutdown();
1510     /** Open a file, start a transmission; fill it with content for a given
1511         root hash and tracker (optional). If "force_check_diskvshash" is true, the
1512         hashtree state will be (re)constructed from the file on disk (if any).
1513         If not, open will try to reconstruct the hashtree state from
1514         the .mhash and .mbinmap files on disk. .mhash files are created
1515         automatically, .mbinmap files must be written by checkpointing the
1516         transfer by calling FileTransfer::serialize(). If the reconstruction
1517         fails, it will hashcheck anyway. Roothash is optional for new files or
1518         files already hashchecked and checkpointed. If "check_netwvshash" is
1519         false, no uncle hashes will be sent and no data will be verified against
1520         them on receipt. In this mode, checking disk contents against hashes
1521         no longer works on restarts, unless checkpoints are used.
1522         The .mhash and .mbinmap files might be located along with the file, or in
1523         a separate directory specified by the metadir parameter.
1524         */
1525     // TODO: replace check_netwvshash with full set of protocol options
1526     int     Open(std::string filename, SwarmID& swarmid, std::string trackerurl="", bool force_check_diskvshash=true,
1527                  popt_cont_int_prot_t cipm=POPT_CONT_INT_PROT_MERKLE, bool zerostate=false, bool activate=true,
1528                  uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE, std::string metadir="");
1529     /** Get the root hash for the transmission. */
1530     SwarmID GetSwarmID(int file);
1531     /** Close a file and a transmission, remove state or content if desired. */
1532     void    Close(int td, bool removestate = false, bool removecontent = false);
1533     /** Add a possible peer which participares in a given transmission. In the case
1534         root hash is zero, the peer might be talked to regarding any transmission
1535         (likely, a tracker, cache or an archive). */
1536     void    AddPeer(Address& address, SwarmID& swarmid);
1537 
1538     /** UNIX pread approximation. Does change file pointer. Thread-safe if no concurrent writes. Autoactivates */
1539     ssize_t Read(int td, void *buf, size_t nbyte, int64_t offset);  // off_t not 64-bit dynamically on Win32
1540 
1541     /** UNIX pwrite approximation. Does change file pointer. Is not thread-safe. Autoactivates. */
1542     ssize_t Write(int td, const void *buf, size_t nbyte, int64_t offset);
1543 
1544     /** Seek, i.e., move start of interest window */
1545     int     Seek(int td, int64_t offset, int whence);
1546     /** Set the default tracker that is used when Open is not passed a tracker
1547         address. */
1548     void    SetTracker(std::string trackerurl);
1549     /** Returns size of the file in bytes, 0 if unknown. Might be rounded up
1550         to a kilobyte before the transmission is complete. */
1551     uint64_t Size(int td);
1552     /** Returns the amount of retrieved and verified data, in bytes.
1553         A 100% complete transmission has Size()==Complete(). */
1554     uint64_t Complete(int td);
1555     bool    IsComplete(int td);
1556     /** Returns the number of bytes that are complete sequentially, starting
1557         from the beginning, till the first not-yet-retrieved packet.
1558         For LIVE beginning = GetHookinOffset() */
1559     uint64_t SeqComplete(int td, int64_t offset=0);
1560     /** Returns the bin at which we hooked into the live stream. */
1561     uint64_t GetHookinOffset(int td);
1562 
1563     /** Arno: See if swarm is known and activate if requested */
1564     int     Find(SwarmID& swarmid, bool activate=false);
1565     /** Returns the number of bytes in a chunk for this transmission */
1566     uint32_t ChunkSize(int td);
1567 
1568     // LIVE
1569     /** To create a live stream as source */
1570     LiveTransfer *LiveCreate(std::string filename, KeyPair &keypair, std::string checkpoint_filename,
1571                              popt_cont_int_prot_t cipm=POPT_CONT_INT_PROT_UNIFIED_MERKLE, uint64_t disc_wnd=POPT_LIVE_DISC_WND_ALL,
1572                              uint32_t nchunks_per_sign=SWIFT_DEFAULT_LIVE_NCHUNKS_PER_SIGN, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
1573     /** To add chunks to a live stream as source */
1574     int     LiveWrite(LiveTransfer *lt, const void *buf, size_t nbyte);
1575     /** To open a live stream as peer */
1576     int     LiveOpen(std::string filename, SwarmID &swarmid, std::string trackerurl, Address &srcaddr,
1577                      popt_cont_int_prot_t cipm, uint64_t disc_wnd=POPT_LIVE_DISC_WND_ALL, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
1578 
1579     /** Register a callback for when the download of another pow(2,agg) chunks
1580         is finished. So agg = 0 = 2^0 = 1, means every chunk. */
1581     void    AddProgressCallback(int td, ProgressCallback cb, uint8_t agg);
1582     /** Deregister a previously added callback. */
1583     void    RemoveProgressCallback(int td, ProgressCallback cb);
1584 
1585     /** Return the transfer descriptors of all loaded transfers (incl. LIVE). */
1586     tdlist_t GetTransferDescriptors();
1587     /** Set the maximum speed in bytes/s for the transfer */
1588     void    SetMaxSpeed(int td, data_direction_t ddir, double speed);
1589     /** Get the current speed in bytes/s for the transfer, if activated. */
1590     double  GetCurrentSpeed(int td, data_direction_t ddir);
1591     /** Get the number of incomplete peers for the transfer, if activated. */
1592     uint32_t  GetNumLeechers(int td);
1593     /** Get the number of completed peers for the transfer, if activated. */
1594     uint32_t  GetNumSeeders(int td);
1595     /** Return the type of this transfer */
1596     transfer_t ttype(int td);
1597     /** Get Storage object for the transfer, if activated. */
1598     Storage *GetStorage(int td);
1599     /** Get Storage object's main storage filename. */
1600     std::string GetOSPathName(int td);
1601     /** Whether this transfer is in working condition, if activated. */
1602     bool    IsOperational(int td);
1603     /** Whether this transfer uses the zero-state implementation. */
1604     bool    IsZeroState(int td);
1605     /** Save the binmap for the transfer for restarts without from-disk hash checking */
1606     int Checkpoint(int transfer);
1607 
1608     /** Return the ContentTransfer * for the transfer, if activated.
1609         For internal use only */
1610     ContentTransfer *GetActivatedTransfer(int td);
1611     /** Record use of this transfer. For internal use only. */
1612     void Touch(int td);
1613     /** Write a checkpoint for filename in .mhash and .mbinmap files without
1614      * creating a FileTransfer object */
1615     int HashCheckOffline(std::string filename, Sha1Hash *calchashptr, uint32_t chunk_size=SWIFT_DEFAULT_CHUNK_SIZE);
1616 
1617     // Arno: helper functions for constructing datagrams */
1618     int evbuffer_add_string(struct evbuffer *evb, std::string str);
1619     int evbuffer_add_8(struct evbuffer *evb, uint8_t b);
1620     int evbuffer_add_16be(struct evbuffer *evb, uint16_t w);
1621     int evbuffer_add_32be(struct evbuffer *evb, uint32_t i);
1622     int evbuffer_add_64be(struct evbuffer *evb, uint64_t l);
1623     int evbuffer_add_hash(struct evbuffer *evb, const Sha1Hash& hash);
1624     int evbuffer_add_chunkaddr(struct evbuffer *evb, bin_t &b, popt_chunk_addr_t chunk_addr); // PPSP
1625     int evbuffer_add_pexaddr(struct evbuffer *evb, Address& a);
1626 
1627     uint8_t evbuffer_remove_8(struct evbuffer *evb);
1628     uint16_t evbuffer_remove_16be(struct evbuffer *evb);
1629     uint32_t evbuffer_remove_32be(struct evbuffer *evb);
1630     uint64_t evbuffer_remove_64be(struct evbuffer *evb);
1631     Sha1Hash evbuffer_remove_hash(struct evbuffer* evb);
1632     binvector evbuffer_remove_chunkaddr(struct evbuffer *evb, popt_chunk_addr_t chunk_addr); // PPSP
1633     Address evbuffer_remove_pexaddr(struct evbuffer *evb, int family);
1634     void chunk32_to_bin32(uint32_t schunk, uint32_t echunk, binvector *bvptr);
1635     binvector bin_fragment(bin_t &origbin, bin_t &cancelbin);
1636 
1637     const char* tintstr(tint t=0);
1638 
1639     // SOCKTUNNEL
1640     bool CmdGwTunnelCheckChannel(uint32_t tunnel_id); // messages prefixed with tunnel id will be forwarded
1641     void CmdGwTunnelUDPDataCameIn(Address srcaddr, uint32_t srcchan, struct evbuffer* evb);
1642     void CmdGwTunnelSendUDP(struct evbuffer *evb); // for friendship with Channel
1643 
1644 } // namespace end
1645 
1646 // #define SWIFT_MUTE
1647 
1648 #ifndef SWIFT_MUTE
1649 #define dprintf(...) do { if (Channel::debug_file) fprintf(Channel::debug_file,__VA_ARGS__); } while (0)
1650 #define dflush() fflush(Channel::debug_file)
1651 #define lprintf(...) do { if (Channel::debug_ledbat) fprintf(Channel::debug_ledbat,__VA_ARGS__); } while (0)
1652 #define lflush() fflush(Channel::debug_ledbat)
1653 #else
1654 #define dprintf(...) do {} while(0)
1655 #define dflush() do {} while(0)
1656 #endif
1657 #define eprintf(...) fprintf(stderr,__VA_ARGS__)
1658 
1659 #endif
1660