1 /*
2  *  live.cpp
3  *  Subclass of ContentTransfer for live streaming.
4  *
5  *  Arno: Currently uses ever increasing chunk IDs. The binmap datastructure
6  *  can store this quite efficiently, as long as there are few holes.
7  *  The Storage object can save all chunks or wrap around, that is, a certain
8  *  modulo is applied that overwrites earlier chunks.
9  *  This modulo is equivalent to the live discard window (see IETF PPSPP spec).
10  *  This overwriting can be done both at the source and in a client.
11  *
12  *  Created by Arno Bakker.
13  *  Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved.
14  *
15  * Note:
16  *
17  *  - Windows needs live discard window < max as it doesn't have sparse files,
18  *    so tuning in at chunk 197992 without a smaller window causes a
19  *    file-allocation stall in clients.
20  *
21  *  TODO:
22  *  - picker than optimizes sharing (cf. small swarms sharing)
23  *      * don't have piece_due
24  *          + Could build now based on SIGNED_INTEGRITY timestamp
25  *      * need rarest or latest bla-bla (UTorino guy)
26  *      DONE: probabilistic one
27  *
28  *  - aux live seeders?
29  *
30  *  - restartable live source (idea for UMT: just start new subtree,
31  *    remembering transient root hash of previous to be used when tree grows
32  *    in level above current size.)
33  *        DONE.
34  *
35  *  - avg upload buggy? Check problem found by Riccardo.
36  *
37  *  - (related) Pass live discard window via CMDGW interface. Needed by Windows, see above.
38  *
39  *  - sendrecv.cpp Don't add DATA+bin+etc if read of data fails.
40  *
41  *  - avoid infinitely growing vector of channels.
42  *
43  *  - Crash on end-of-HTTP request for live.
44  *
45  *  - Replace divergence with time based approached using timestamp from SIGNED_INTEGRITY
46  *     + OLD: temp check if not diverging too much from source via CalculateHookinPos() authoritative.
47  *     + OLD: Policy for how far peer and source may be apart before rehook-in
48  *     + OLD: Idea: client skips when signed peaks arrives that is way past hook-in point.
49  *
50  *  - Write/Read tracker from -r urlfilename too.
51  */
52 //LIVE
53 #include "swift.h"
54 #include <cfloat>
55 
56 #include "ext/live_picker.cpp" // FIXME FIXME FIXME FIXME
57 
58 using namespace swift;
59 
60 
61 /*
62  * Global Variables
63  */
64 std::vector<LiveTransfer*> LiveTransfer::liveswarms;
65 
66 
67 /*
68  * Local Constants
69  */
70 // live transfers get a transfer description (TD) above this offset
71 #define TRANSFER_DESCR_LIVE_OFFSET  4000000
72 
73 /** A constructor for a live source. */
74 LiveTransfer::LiveTransfer(std::string filename, KeyPair &keypair, std::string checkpoint_filename,
75                            popt_cont_int_prot_t cipm, uint64_t disc_wnd, uint32_t nchunks_per_sign, uint32_t chunk_size) :
76     ContentTransfer(LIVE_TRANSFER), ack_out_right_basebin_(bin_t::NONE),
77     chunk_size_(chunk_size), am_source_(true),
78     filename_(filename), last_chunkid_(0), offset_(0),
79     chunks_since_sign_(0),
80     checkpoint_filename_(checkpoint_filename), checkpoint_bin_(bin_t::NONE)
81 {
82     Initialize(keypair,cipm,disc_wnd,nchunks_per_sign);
83 
84     // Determine swarmID from keypair
85     SwarmPubKey    *spubkey_ptr = keypair.GetSwarmPubKey();
86     if (spubkey_ptr == NULL) {
87         SetBroken();
88         return;
89     }
90     swarm_id_ = SwarmID(*spubkey_ptr);
91 
92     picker_ = NULL;
93 
94     // Restarting source from checkpoint?
95 
96     BinHashSigTuple lastmunrotup = ReadCheckpoint();
97     if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_NONE) {
98         // Start generating chunks from rootbin.base_right()+1
99     } else if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
100         /*
101          * Read live source state from checkpoint. This is the info about
102          * the last munro in the tree from the previous instance.
103          * We turn this munro into a first munro in our new tree, but
104          * do not advertise its chunks. Clients should skip over the unused
105          * parts of the old tree and start downloading the chunks in the new
106          * part of our tree.
107          *
108          *           new virtual root
109          *             /          \
110          *            /            \
111          *     checkpoint        first new chunk
112          *         munro
113          */
114         LiveHashTree *umt = (LiveHashTree *)hashtree_;
115         if (lastmunrotup.bin() != bin_t::NONE) {
116             fprintf(stderr,"live: source: found checkpoint\n");
117 
118             if (umt->InitFromCheckpoint(lastmunrotup)) {
119                 checkpoint_bin_ = lastmunrotup.bin();
120                 last_chunkid_ = checkpoint_bin_.base_right().base_offset()+1;
121                 offset_ = last_chunkid_ * chunk_size_;
122 
123                 UpdateSignedAckOut();
124             }
125         }
126 
127         fprintf(stderr,"live: source: restored lastchunkid %" PRIu64 "\n", last_chunkid_);
128     } else { // SIGNALL
129         // Start generating chunks from rootbin.base_right()+1
130     }
131 }
132 
133 
134 /** A constructor for live client. */
135 LiveTransfer::LiveTransfer(std::string filename, SwarmID &swarmid, Address &srcaddr, popt_cont_int_prot_t cipm,
136                            uint64_t disc_wnd, uint32_t chunk_size) :
137     ContentTransfer(LIVE_TRANSFER), chunk_size_(chunk_size), am_source_(false),
138     filename_(filename), last_chunkid_(0), offset_(0),
139     chunks_since_sign_(0),
140     checkpoint_filename_(""), checkpoint_bin_(bin_t::NONE),
141     srcaddr_(srcaddr)
142 {
143     swarm_id_ = swarmid;
144     SwarmPubKey spubkey = swarm_id_.spubkey();
145     KeyPair *kp = spubkey.GetPublicKeyPair();
146 
147     if (kp == NULL) {
148         SetBroken();
149         return;
150     }
151 
152     Initialize(*kp,cipm,disc_wnd,0);
153 
154 #if ENABLE_LIVE_SMALLSWARMOPT_PIECEPICKER == 1
155     fprintf(stderr,"live: Enabling small swarm optimization\n");
156     picker_ = new SharingLivePiecePicker(this);
157 #else
158     picker_ = new SimpleLivePiecePicker(this);
159 #endif
160     picker_->Randomize(rand()&63);
161 }
162 
163 
164 void LiveTransfer::Initialize(KeyPair &keypair, popt_cont_int_prot_t cipm, uint64_t disc_wnd,uint32_t nchunks_per_sign)
165 {
166     GlobalAdd();
167 
168     Handshake hs;
169     hs.live_sig_alg_ = keypair.GetSigAlg();
170     if (cipm == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
171         hs.cont_int_prot_ = cipm;
172     else
173         hs.cont_int_prot_ = POPT_CONT_INT_PROT_NONE;
174     hs.live_disc_wnd_ = disc_wnd;
175 
176     fprintf(stderr,"LiveTransfer::Initialize: cipm %" PRIu32 "\n", hs.cont_int_prot_);
177     fprintf(stderr,"LiveTransfer::Initialize: lsa  %" PRIu32 "\n", hs.live_sig_alg_);
178     fprintf(stderr,"LiveTransfer::Initialize: ldw  %" PRIu64 "\n", hs.live_disc_wnd_);
179 
180     SetDefaultHandshake(hs);
181 
182     std::string destdir;
183     int ret = file_exists_utf8(filename_);
184     if (ret == 2)  {
185         // Filename is a directory, download to swarmid-as-hex file there
186         destdir = filename_;
187         filename_ = destdir+FILE_SEP+swarm_id_.tofilename();
188     } else {
189         destdir = dirname_utf8(filename_);
190         if (destdir == "")
191             destdir = ".";
192     }
193 
194     // Live, delete any existing storage
195     (void)remove_utf8(filename_);
196 
197     // MULTIFILE
198     uint64_t ldwb = hs.live_disc_wnd_;
199     if (ldwb != POPT_LIVE_DISC_WND_ALL)
200         ldwb *= chunk_size_;
201     storage_ = new Storage(filename_,destdir,td_,ldwb,"");
202 
203     if (hs.cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
204         if (nchunks_per_sign > 1)
205             hashtree_ = new LiveHashTree(storage_,keypair,chunk_size_,nchunks_per_sign); // source
206         else
207             hashtree_ = new LiveHashTree(storage_,keypair,chunk_size_); //client
208     } else
209         hashtree_ = NULL;
210 }
211 
212 
213 LiveTransfer::~LiveTransfer()
214 {
215     if (picker_ != NULL) {
216         delete picker_;
217         picker_ = NULL;
218     }
219 
220     GlobalDel();
221 }
222 
223 
224 
225 void LiveTransfer::GlobalAdd()
226 {
227 
228     int idx = liveswarms.size();
229     td_ = idx + TRANSFER_DESCR_LIVE_OFFSET;
230 
231     if (liveswarms.size()<idx+1)
232         liveswarms.resize(idx+1);
233     liveswarms[idx] = this;
234 }
235 
236 
237 void LiveTransfer::GlobalDel()
238 {
239     int idx = td_ - TRANSFER_DESCR_LIVE_OFFSET;
240     liveswarms[idx] = NULL;
241 }
242 
243 
244 LiveTransfer *LiveTransfer::FindByTD(int td)
245 {
246     int idx = td - TRANSFER_DESCR_LIVE_OFFSET;
247     return idx<liveswarms.size() ? (LiveTransfer *)liveswarms[idx] : NULL;
248 }
249 
250 LiveTransfer* LiveTransfer::FindBySwarmID(const SwarmID& swarmid)
251 {
252     for (int i=0; i<liveswarms.size(); i++)
253         if (liveswarms[i] && liveswarms[i]->swarm_id()==swarmid)
254             return liveswarms[i];
255     return NULL;
256 }
257 
258 
259 tdlist_t LiveTransfer::GetTransferDescriptors()
260 {
261     tdlist_t tds;
262     for (int i=0; i<liveswarms.size(); i++)
263         if (liveswarms[i] != NULL)
264             tds.push_back(i+TRANSFER_DESCR_LIVE_OFFSET);
265     return tds;
266 }
267 
268 
269 
270 
271 uint64_t LiveTransfer::SeqComplete()
272 {
273 
274     if (am_source_) {
275         uint64_t seqc = ack_out()->find_empty().base_offset();
276         return seqc*chunk_size_;
277     }
278     bin_t hpos = ((LivePiecePicker *)picker())->GetHookinPos();
279     bin_t cpos = ((LivePiecePicker *)picker())->GetCurrentPos();
280     if (hpos == bin_t::NONE || cpos == bin_t::NONE)
281         return 0;
282     else {
283         uint64_t seqc = cpos.layer_offset() - hpos.layer_offset();
284         return seqc*chunk_size_;
285     }
286 }
287 
288 
289 uint64_t LiveTransfer::GetHookinOffset()
290 {
291 
292     bin_t hpos = ((LivePiecePicker *)picker())->GetHookinPos();
293     uint64_t seqc = hpos.layer_offset();
294     return seqc*chunk_size_;
295 }
296 
297 
298 
299 
300 int LiveTransfer::AddData(const void *buf, uint32_t nbyte)
301 {
302     // fprintf(stderr,"%s live: AddData: writing to storage %" PRIu64 "\n", tintstr(), nbyte);
303 
304     // Save chunk on disk
305     int ret = storage_->Write(buf,nbyte,offset_);
306     if (ret < 0) {
307         print_error("live: create: error writing to storage");
308         return ret;
309     }
310     //else
311     //    fprintf(stderr,"%s live: AddData: stored %d bytes\n", tintstr(), ret );
312 
313     uint64_t till = std::max((uint32_t)1,nbyte/chunk_size_);
314     bool newepoch=false;
315     for (uint64_t c=0; c<till; c++) {
316         // New chunk is here
317         bin_t chunkbin(0,last_chunkid_);
318         ack_out_.set(chunkbin);
319 
320         last_chunkid_++;
321         offset_ += chunk_size_;
322 
323         // SIGNPEAK
324         if (def_hs_out_.cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) {
325             LiveHashTree *umt = (LiveHashTree *)hashtree();
326             uint32_t bufidx = c*chunk_size_;
327             char *bufptr = ((char *)buf)+bufidx;
328             uint32_t s = std::min(chunk_size_,nbyte-bufidx);
329             // Build dynamic hash tree
330             umt->AddData(bufptr,s);
331 
332             // Create new signed peaks after N chunks
333             // Note: this means that if we use a file as input, the last < N
334             // chunks never get announced.
335             chunks_since_sign_++;
336             if (chunks_since_sign_ == umt->GetNChunksPerSig()) {
337                 BinHashSigTuple lasttup = umt->AddSignedMunro();
338                 // LIVECHECKPOINT
339                 if (checkpoint_filename_.length() > 0) {
340                     WriteCheckpoint(lasttup);
341                 }
342 
343                 chunks_since_sign_ = 0;
344                 newepoch = true;
345 
346                 // Arno, 2013-02-26: Can only send HAVEs covered by signed peaks
347                 // At this point in time, peaks == signed peaks
348                 UpdateSignedAckOut();
349 
350                 // Forget old part of tree
351                 if (def_hs_out_.live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) {
352                     OnDataPruneTree(def_hs_out_,bin_t(0,last_chunkid_),umt->GetNChunksPerSig());
353                 }
354             }
355         } else
356             newepoch = true;
357     }
358 
359     fprintf(stderr,"live: AddData: added till chunkid %" PRIi64 "\n", last_chunkid_);
360     dprintf("%s %%0 live: AddData: added till chunkid %" PRIi64 "\n", tintstr(), last_chunkid_);
361 
362     // Arno, 2013-02-26: When UNIFIED_MERKLE chunks are published in batches
363     // of nchunks_per_sign_
364     if (!newepoch)
365         return 0;
366 
367     // Announce chunks to peers via HAVEs
368     fprintf(stderr,"live: AddData: announcing to " PRISIZET " channels\n", mychannels_.size());
369     channels_t::iterator iter;
370     for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++) {
371         Channel *c = *iter;
372         //DDOS
373         if (c->is_established()) {
374             dprintf("%s %%0 live: AddData: send on channel %d\n", tintstr(), c->id());
375             c->LiveSend();
376         }
377     }
378 
379     return 0;
380 }
381 
382 
383 void LiveTransfer::UpdateSignedAckOut()
384 {
385     // Arno, 2013-02-26: Can only send HAVEs covered by signed peaks
386     // At this point in time, peaks == signed peaks
387     LiveHashTree *umt = (LiveHashTree *)hashtree();
388 
389     signed_ack_out_.clear();
390     for (int i=0; i<umt->peak_count(); i++) {
391         bin_t sigpeak = umt->peak(i);
392         signed_ack_out_.set(sigpeak);
393 
394         //fprintf(stderr,"live: AddData: UMT: DOHAVE %s %s %s\n", sigpeak.str().c_str(), sigpeak.base_left().str().c_str(), sigpeak.base_right().str().c_str() );
395     }
396     // LIVECHECKPOINT, see constructor
397     // SIGNMUNRO
398     if (checkpoint_bin_ != bin_t::NONE) {
399         for (int i=0; i<=checkpoint_bin_.layer_offset(); i++) {
400             bin_t clearbin(checkpoint_bin_.layer(),i);
401             signed_ack_out_.reset(clearbin);
402 
403             //fprintf(stderr,"live: AddData: UMT: UNHAVE %s %s %s\n", clearbin.str().c_str(), clearbin.base_left().str().c_str(), clearbin.base_right().str().c_str() );
404         }
405         // TEST
406         /* fprintf(stderr,"live: AddData: UMT: checkp %s\n", checkpoint_bin_.str().c_str() );
407         bin_t bf = signed_ack_out_.find_filled();
408         bin_t be = signed_ack_out_.find_empty();
409         bin_t sbe = signed_ack_out_.find_empty(checkpoint_bin_);
410 
411         fprintf(stderr,"live: AddData: UMT: bf %s be %s sbe %s\n", bf.str().c_str(), be.str().c_str(), sbe.str().c_str() );
412         */
413     }
414 }
415 
416 
417 
418 void LiveTransfer::UpdateOperational()
419 {
420 }
421 
422 
423 binmap_t *LiveTransfer::ack_out_signed()
424 {
425     if (!am_source_ || hashtree() == NULL)
426         return &ack_out_;
427 
428     // Arno, 2013-02-26: Cannot send HAVEs not covered by signed peak
429     return &signed_ack_out_;
430 }
431 
432 binmap_t *LiveTransfer::ack_out()
433 {
434     if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE)
435         return hashtree_->ack_out();
436     else
437         return &ack_out_; // tree less, use local binmap.
438 }
439 
440 
441 void LiveTransfer::OnVerifiedMunroHash(bin_t munro, tint sourcet)
442 {
443     // Channel sendc received a correctly signed munro.
444     LiveHashTree *umt = (LiveHashTree *)hashtree();
445     if (umt != NULL)
446         umt->SetNChunksPerSig(munro.base_length());
447 
448     // Arno, 2013-05-22: Hook-in using signed peaks in UMT.
449     LivePiecePicker *lpp = (LivePiecePicker *)picker_;
450     lpp->AddPeerMunro(munro, sourcet);
451 }
452 
453 
454 void LiveTransfer::OnDataPruneTree(Handshake &hs_out, bin_t pos, uint32_t nchunks2forget)
455 {
456     if (nchunks2forget < 1) // nchunks_per_sig_ unknown
457         return;
458 
459     if (ack_out_right_basebin_ == bin_t::NONE || pos > ack_out_right_basebin_)
460         ack_out_right_basebin_ = pos;
461     else
462         return; // Don't prune if no change
463 
464     uint64_t lastchunkid = ack_out_right_basebin_.layer_offset();
465 
466     int64_t oldcid = ((int64_t)lastchunkid - (int64_t)hs_out.live_disc_wnd_);
467     if (oldcid > 0) {
468         // Find subtree left of window with width nchunks2forget that can be pruned
469         uint64_t extracid = oldcid % nchunks2forget;
470         uint64_t startcid = oldcid - extracid;
471         int64_t leftcid = ((int64_t)startcid - (int64_t)nchunks2forget);
472         if (leftcid >= 0) {
473             bin_t leftpos(0,leftcid);
474 
475             bin_t::uint_t nchunks_per_sign_layer = (bin_t::uint_t)log2((double)nchunks2forget);
476             for (int h=0; h<nchunks_per_sign_layer; h++) {
477                 leftpos = leftpos.parent();
478             }
479 
480             // Find biggest subtree to remove
481             if (leftpos.is_right()) {
482                 while (leftpos.parent().is_right()) {
483                     leftpos = leftpos.parent();
484                 }
485             }
486             //fprintf(stderr,"live: OnDataPruneTree: prune %s log %lf nchunks %d window %" PRIu64 " when %" PRIu64 "\n", leftpos.str().c_str(), log2((double)lastchunkid), nchunks2forget, hs_out.live_disc_wnd_, lastchunkid );
487             LiveHashTree *umt = (LiveHashTree *)hashtree();
488             umt->PruneTree(leftpos);
489         }
490     }
491 }
492 
493 
494 int LiveTransfer::WriteCheckpoint(BinHashSigTuple &munrotup)
495 {
496     // FORMAT: (layer,layeroff) munrohash-in-hex timestamp munrosig-in-hex\n
497     char tscstr[256];
498     sprintf(tscstr,"%" PRIi64 "",munrotup.sigtint().time());
499     std::string s = munrotup.bin().str()+" "+munrotup.hash().hex()+" "+std::string(tscstr)+" "
500                     +munrotup.sigtint().sig().hex()+"\n";
501     char *cstr = new char[strlen(s.c_str())+1];
502 
503     strcpy(cstr,s.c_str());
504 
505     // TODO: atomic?
506     int fd = open_utf8(checkpoint_filename_.c_str(),OPENFLAGS,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
507     if (fd < 0) {
508         print_error("could not write checkpoint file");
509         delete cstr;
510         return fd;
511     }
512     int ret = write(fd,cstr,strlen(cstr));
513     if (ret < 0) {
514         print_error("could not write live checkpoint data");
515         delete cstr;
516         return ret;
517     }
518     delete cstr;
519     ret = close(fd);
520 
521     return ret;
522 }
523 
524 
525 BinHashSigTuple LiveTransfer::ReadCheckpoint()
526 {
527     // TODO: atomic?
528     int fd = open_utf8(checkpoint_filename_.c_str(),ROOPENFLAGS,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH);
529     if (fd < 0) {
530         print_error("could not read live checkpoint file");
531         return BinHashSigTuple::NOBULL;
532     }
533     char buffer[1024];
534     int ret = read(fd,buffer,1024);
535     if (ret < 0) {
536         print_error("could not read live checkpoint data");
537         return BinHashSigTuple::NOBULL;
538     }
539     close(fd);
540 
541     // Overwrite \n with end-of-string
542     buffer[ret-1] = '\0';
543     std::string pstr(buffer);
544 
545     std::string binstr;
546     std::string hashstr;
547     std::string timestr;
548     std::string sigstr;
549     int sidx = pstr.find(" ");
550     if (sidx == std::string::npos) {
551         print_error("could not parsing live checkpoint: no bin");
552         return BinHashSigTuple::NOBULL;
553     } else {
554         binstr = pstr.substr(0,sidx);
555         int midx = pstr.find(" ",sidx+1);
556         if (midx == std::string::npos) {
557             print_error("could not parsing live checkpoint: no hash");
558             return BinHashSigTuple::NOBULL;
559         } else {
560             hashstr = pstr.substr(sidx+1,midx-sidx-1);
561             int m2idx = pstr.find(" ",midx+1);
562             if (m2idx == std::string::npos) {
563                 print_error("could not parsing live checkpoint: no timestamp");
564                 return BinHashSigTuple::NOBULL;
565             } else {
566                 timestr = pstr.substr(midx+1,m2idx-midx-1);
567                 sigstr = pstr.substr(m2idx+1);
568             }
569         }
570     }
571 
572     sidx = binstr.find(",");
573     if (sidx == std::string::npos) {
574         print_error("could not parsing live checkpoint: bin bad");
575         return BinHashSigTuple::NOBULL;
576     }
577 
578 
579     //fprintf(stderr,"CHECKPOINT read <%s> <%s> <%s> <%s>\n", binstr.c_str(), hashstr.c_str(), timestr.c_str(), sigstr.c_str() );
580 
581     std::string layerstr=binstr.substr(1,sidx-1);
582     std::string layeroffstr=binstr.substr(sidx+1,binstr.length()-sidx-2);
583 
584     int layer;
585     ret = sscanf(layerstr.c_str(),"%d",&layer);
586     if (ret != 1) {
587         print_error("could not parsing live checkpoint: bin layer bad");
588         return BinHashSigTuple::NOBULL;
589     }
590     bin_t::uint_t layeroff;
591     ret = sscanf(layeroffstr.c_str(),"%llu",&layeroff);
592     if (ret != 1) {
593         print_error("could not parsing live checkpoint: bin layer off bad");
594         return BinHashSigTuple::NOBULL;
595     }
596 
597 
598     tint munrotimestamp;
599     ret = sscanf(timestr.c_str(),"%" SCNi64 "",&munrotimestamp);
600     if (ret != 1) {
601         print_error("could not parsing live checkpoint: timestamp bad");
602         return BinHashSigTuple::NOBULL;
603     }
604 
605 
606     bin_t munrobin(layer,layeroff);
607     Sha1Hash munrohash = Sha1Hash(true,hashstr.c_str());
608     Signature munrosig = Signature(true,(const uint8_t *)sigstr.c_str(),(uint16_t)sigstr.length());
609     SigTintTuple munrost = SigTintTuple(munrosig,munrotimestamp);
610 
611     //fprintf(stderr,"CHECKPOINT parsed <%s> %d %" SCNu64 " <%s> <%s> %" PRIi64 " <%s>\n", munrobin.str().c_str(), layer, layeroff, munrohash.hex().c_str(), timestr.c_str(), munrotimestamp, munrosig.hex().c_str() );
612 
613     return BinHashSigTuple(munrobin,munrohash,munrost);
614 }
615 
616 
617 bin_t LiveTransfer::GetSourceCurrentPos()
618 {
619     return bin_t(0,last_chunkid_);
620 }
621 
622 
623 /*
624  * Channel extensions for live
625  */
626 
627 void Channel::LiveSend()
628 {
629     //fprintf(stderr,"live: LiveSend: channel %d\n", id() );
630 
631     if (evsendlive_ptr_ == NULL) {
632         evsendlive_ptr_ = new struct event;
633         // Arno, 2013-02-01: Don't reassign, causes crashes.
634         evtimer_assign(evsendlive_ptr_,evbase,&Channel::LibeventSendCallback,this);
635     }
636     //fprintf(stderr,"live: LiveSend: next %" PRIi64 "\n", next_send_time_ );
637     evtimer_add(evsendlive_ptr_,tint2tv(next_send_time_));
638 }
639 
640