1 /* 2 * live.cpp 3 * Subclass of ContentTransfer for live streaming. 4 * 5 * Arno: Currently uses ever increasing chunk IDs. The binmap datastructure 6 * can store this quite efficiently, as long as there are few holes. 7 * The Storage object can save all chunks or wrap around, that is, a certain 8 * modulo is applied that overwrites earlier chunks. 9 * This modulo is equivalent to the live discard window (see IETF PPSPP spec). 10 * This overwriting can be done both at the source and in a client. 11 * 12 * Created by Arno Bakker. 13 * Copyright 2009-2016 TECHNISCHE UNIVERSITEIT DELFT. All rights reserved. 14 * 15 * Note: 16 * 17 * - Windows needs live discard window < max as it doesn't have sparse files, 18 * so tuning in at chunk 197992 without a smaller window causes a 19 * file-allocation stall in clients. 20 * 21 * TODO: 22 * - picker than optimizes sharing (cf. small swarms sharing) 23 * * don't have piece_due 24 * + Could build now based on SIGNED_INTEGRITY timestamp 25 * * need rarest or latest bla-bla (UTorino guy) 26 * DONE: probabilistic one 27 * 28 * - aux live seeders? 29 * 30 * - restartable live source (idea for UMT: just start new subtree, 31 * remembering transient root hash of previous to be used when tree grows 32 * in level above current size.) 33 * DONE. 34 * 35 * - avg upload buggy? Check problem found by Riccardo. 36 * 37 * - (related) Pass live discard window via CMDGW interface. Needed by Windows, see above. 38 * 39 * - sendrecv.cpp Don't add DATA+bin+etc if read of data fails. 40 * 41 * - avoid infinitely growing vector of channels. 42 * 43 * - Crash on end-of-HTTP request for live. 44 * 45 * - Replace divergence with time based approached using timestamp from SIGNED_INTEGRITY 46 * + OLD: temp check if not diverging too much from source via CalculateHookinPos() authoritative. 47 * + OLD: Policy for how far peer and source may be apart before rehook-in 48 * + OLD: Idea: client skips when signed peaks arrives that is way past hook-in point. 49 * 50 * - Write/Read tracker from -r urlfilename too. 51 */ 52 //LIVE 53 #include "swift.h" 54 #include <cfloat> 55 56 #include "ext/live_picker.cpp" // FIXME FIXME FIXME FIXME 57 58 using namespace swift; 59 60 61 /* 62 * Global Variables 63 */ 64 std::vector<LiveTransfer*> LiveTransfer::liveswarms; 65 66 67 /* 68 * Local Constants 69 */ 70 // live transfers get a transfer description (TD) above this offset 71 #define TRANSFER_DESCR_LIVE_OFFSET 4000000 72 73 /** A constructor for a live source. */ 74 LiveTransfer::LiveTransfer(std::string filename, KeyPair &keypair, std::string checkpoint_filename, 75 popt_cont_int_prot_t cipm, uint64_t disc_wnd, uint32_t nchunks_per_sign, uint32_t chunk_size) : 76 ContentTransfer(LIVE_TRANSFER), ack_out_right_basebin_(bin_t::NONE), 77 chunk_size_(chunk_size), am_source_(true), 78 filename_(filename), last_chunkid_(0), offset_(0), 79 chunks_since_sign_(0), 80 checkpoint_filename_(checkpoint_filename), checkpoint_bin_(bin_t::NONE) 81 { 82 Initialize(keypair,cipm,disc_wnd,nchunks_per_sign); 83 84 // Determine swarmID from keypair 85 SwarmPubKey *spubkey_ptr = keypair.GetSwarmPubKey(); 86 if (spubkey_ptr == NULL) { 87 SetBroken(); 88 return; 89 } 90 swarm_id_ = SwarmID(*spubkey_ptr); 91 92 picker_ = NULL; 93 94 // Restarting source from checkpoint? 95 96 BinHashSigTuple lastmunrotup = ReadCheckpoint(); 97 if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_NONE) { 98 // Start generating chunks from rootbin.base_right()+1 99 } else if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) { 100 /* 101 * Read live source state from checkpoint. This is the info about 102 * the last munro in the tree from the previous instance. 103 * We turn this munro into a first munro in our new tree, but 104 * do not advertise its chunks. Clients should skip over the unused 105 * parts of the old tree and start downloading the chunks in the new 106 * part of our tree. 107 * 108 * new virtual root 109 * / \ 110 * / \ 111 * checkpoint first new chunk 112 * munro 113 */ 114 LiveHashTree *umt = (LiveHashTree *)hashtree_; 115 if (lastmunrotup.bin() != bin_t::NONE) { 116 fprintf(stderr,"live: source: found checkpoint\n"); 117 118 if (umt->InitFromCheckpoint(lastmunrotup)) { 119 checkpoint_bin_ = lastmunrotup.bin(); 120 last_chunkid_ = checkpoint_bin_.base_right().base_offset()+1; 121 offset_ = last_chunkid_ * chunk_size_; 122 123 UpdateSignedAckOut(); 124 } 125 } 126 127 fprintf(stderr,"live: source: restored lastchunkid %" PRIu64 "\n", last_chunkid_); 128 } else { // SIGNALL 129 // Start generating chunks from rootbin.base_right()+1 130 } 131 } 132 133 134 /** A constructor for live client. */ 135 LiveTransfer::LiveTransfer(std::string filename, SwarmID &swarmid, Address &srcaddr, popt_cont_int_prot_t cipm, 136 uint64_t disc_wnd, uint32_t chunk_size) : 137 ContentTransfer(LIVE_TRANSFER), chunk_size_(chunk_size), am_source_(false), 138 filename_(filename), last_chunkid_(0), offset_(0), 139 chunks_since_sign_(0), 140 checkpoint_filename_(""), checkpoint_bin_(bin_t::NONE), 141 srcaddr_(srcaddr) 142 { 143 swarm_id_ = swarmid; 144 SwarmPubKey spubkey = swarm_id_.spubkey(); 145 KeyPair *kp = spubkey.GetPublicKeyPair(); 146 147 if (kp == NULL) { 148 SetBroken(); 149 return; 150 } 151 152 Initialize(*kp,cipm,disc_wnd,0); 153 154 #if ENABLE_LIVE_SMALLSWARMOPT_PIECEPICKER == 1 155 fprintf(stderr,"live: Enabling small swarm optimization\n"); 156 picker_ = new SharingLivePiecePicker(this); 157 #else 158 picker_ = new SimpleLivePiecePicker(this); 159 #endif 160 picker_->Randomize(rand()&63); 161 } 162 163 164 void LiveTransfer::Initialize(KeyPair &keypair, popt_cont_int_prot_t cipm, uint64_t disc_wnd,uint32_t nchunks_per_sign) 165 { 166 GlobalAdd(); 167 168 Handshake hs; 169 hs.live_sig_alg_ = keypair.GetSigAlg(); 170 if (cipm == POPT_CONT_INT_PROT_UNIFIED_MERKLE) 171 hs.cont_int_prot_ = cipm; 172 else 173 hs.cont_int_prot_ = POPT_CONT_INT_PROT_NONE; 174 hs.live_disc_wnd_ = disc_wnd; 175 176 fprintf(stderr,"LiveTransfer::Initialize: cipm %" PRIu32 "\n", hs.cont_int_prot_); 177 fprintf(stderr,"LiveTransfer::Initialize: lsa %" PRIu32 "\n", hs.live_sig_alg_); 178 fprintf(stderr,"LiveTransfer::Initialize: ldw %" PRIu64 "\n", hs.live_disc_wnd_); 179 180 SetDefaultHandshake(hs); 181 182 std::string destdir; 183 int ret = file_exists_utf8(filename_); 184 if (ret == 2) { 185 // Filename is a directory, download to swarmid-as-hex file there 186 destdir = filename_; 187 filename_ = destdir+FILE_SEP+swarm_id_.tofilename(); 188 } else { 189 destdir = dirname_utf8(filename_); 190 if (destdir == "") 191 destdir = "."; 192 } 193 194 // Live, delete any existing storage 195 (void)remove_utf8(filename_); 196 197 // MULTIFILE 198 uint64_t ldwb = hs.live_disc_wnd_; 199 if (ldwb != POPT_LIVE_DISC_WND_ALL) 200 ldwb *= chunk_size_; 201 storage_ = new Storage(filename_,destdir,td_,ldwb,""); 202 203 if (hs.cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) { 204 if (nchunks_per_sign > 1) 205 hashtree_ = new LiveHashTree(storage_,keypair,chunk_size_,nchunks_per_sign); // source 206 else 207 hashtree_ = new LiveHashTree(storage_,keypair,chunk_size_); //client 208 } else 209 hashtree_ = NULL; 210 } 211 212 213 LiveTransfer::~LiveTransfer() 214 { 215 if (picker_ != NULL) { 216 delete picker_; 217 picker_ = NULL; 218 } 219 220 GlobalDel(); 221 } 222 223 224 225 void LiveTransfer::GlobalAdd() 226 { 227 228 int idx = liveswarms.size(); 229 td_ = idx + TRANSFER_DESCR_LIVE_OFFSET; 230 231 if (liveswarms.size()<idx+1) 232 liveswarms.resize(idx+1); 233 liveswarms[idx] = this; 234 } 235 236 237 void LiveTransfer::GlobalDel() 238 { 239 int idx = td_ - TRANSFER_DESCR_LIVE_OFFSET; 240 liveswarms[idx] = NULL; 241 } 242 243 244 LiveTransfer *LiveTransfer::FindByTD(int td) 245 { 246 int idx = td - TRANSFER_DESCR_LIVE_OFFSET; 247 return idx<liveswarms.size() ? (LiveTransfer *)liveswarms[idx] : NULL; 248 } 249 250 LiveTransfer* LiveTransfer::FindBySwarmID(const SwarmID& swarmid) 251 { 252 for (int i=0; i<liveswarms.size(); i++) 253 if (liveswarms[i] && liveswarms[i]->swarm_id()==swarmid) 254 return liveswarms[i]; 255 return NULL; 256 } 257 258 259 tdlist_t LiveTransfer::GetTransferDescriptors() 260 { 261 tdlist_t tds; 262 for (int i=0; i<liveswarms.size(); i++) 263 if (liveswarms[i] != NULL) 264 tds.push_back(i+TRANSFER_DESCR_LIVE_OFFSET); 265 return tds; 266 } 267 268 269 270 271 uint64_t LiveTransfer::SeqComplete() 272 { 273 274 if (am_source_) { 275 uint64_t seqc = ack_out()->find_empty().base_offset(); 276 return seqc*chunk_size_; 277 } 278 bin_t hpos = ((LivePiecePicker *)picker())->GetHookinPos(); 279 bin_t cpos = ((LivePiecePicker *)picker())->GetCurrentPos(); 280 if (hpos == bin_t::NONE || cpos == bin_t::NONE) 281 return 0; 282 else { 283 uint64_t seqc = cpos.layer_offset() - hpos.layer_offset(); 284 return seqc*chunk_size_; 285 } 286 } 287 288 289 uint64_t LiveTransfer::GetHookinOffset() 290 { 291 292 bin_t hpos = ((LivePiecePicker *)picker())->GetHookinPos(); 293 uint64_t seqc = hpos.layer_offset(); 294 return seqc*chunk_size_; 295 } 296 297 298 299 300 int LiveTransfer::AddData(const void *buf, uint32_t nbyte) 301 { 302 // fprintf(stderr,"%s live: AddData: writing to storage %" PRIu64 "\n", tintstr(), nbyte); 303 304 // Save chunk on disk 305 int ret = storage_->Write(buf,nbyte,offset_); 306 if (ret < 0) { 307 print_error("live: create: error writing to storage"); 308 return ret; 309 } 310 //else 311 // fprintf(stderr,"%s live: AddData: stored %d bytes\n", tintstr(), ret ); 312 313 uint64_t till = std::max((uint32_t)1,nbyte/chunk_size_); 314 bool newepoch=false; 315 for (uint64_t c=0; c<till; c++) { 316 // New chunk is here 317 bin_t chunkbin(0,last_chunkid_); 318 ack_out_.set(chunkbin); 319 320 last_chunkid_++; 321 offset_ += chunk_size_; 322 323 // SIGNPEAK 324 if (def_hs_out_.cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) { 325 LiveHashTree *umt = (LiveHashTree *)hashtree(); 326 uint32_t bufidx = c*chunk_size_; 327 char *bufptr = ((char *)buf)+bufidx; 328 uint32_t s = std::min(chunk_size_,nbyte-bufidx); 329 // Build dynamic hash tree 330 umt->AddData(bufptr,s); 331 332 // Create new signed peaks after N chunks 333 // Note: this means that if we use a file as input, the last < N 334 // chunks never get announced. 335 chunks_since_sign_++; 336 if (chunks_since_sign_ == umt->GetNChunksPerSig()) { 337 BinHashSigTuple lasttup = umt->AddSignedMunro(); 338 // LIVECHECKPOINT 339 if (checkpoint_filename_.length() > 0) { 340 WriteCheckpoint(lasttup); 341 } 342 343 chunks_since_sign_ = 0; 344 newepoch = true; 345 346 // Arno, 2013-02-26: Can only send HAVEs covered by signed peaks 347 // At this point in time, peaks == signed peaks 348 UpdateSignedAckOut(); 349 350 // Forget old part of tree 351 if (def_hs_out_.live_disc_wnd_ != POPT_LIVE_DISC_WND_ALL) { 352 OnDataPruneTree(def_hs_out_,bin_t(0,last_chunkid_),umt->GetNChunksPerSig()); 353 } 354 } 355 } else 356 newepoch = true; 357 } 358 359 fprintf(stderr,"live: AddData: added till chunkid %" PRIi64 "\n", last_chunkid_); 360 dprintf("%s %%0 live: AddData: added till chunkid %" PRIi64 "\n", tintstr(), last_chunkid_); 361 362 // Arno, 2013-02-26: When UNIFIED_MERKLE chunks are published in batches 363 // of nchunks_per_sign_ 364 if (!newepoch) 365 return 0; 366 367 // Announce chunks to peers via HAVEs 368 fprintf(stderr,"live: AddData: announcing to " PRISIZET " channels\n", mychannels_.size()); 369 channels_t::iterator iter; 370 for (iter=mychannels_.begin(); iter!=mychannels_.end(); iter++) { 371 Channel *c = *iter; 372 //DDOS 373 if (c->is_established()) { 374 dprintf("%s %%0 live: AddData: send on channel %d\n", tintstr(), c->id()); 375 c->LiveSend(); 376 } 377 } 378 379 return 0; 380 } 381 382 383 void LiveTransfer::UpdateSignedAckOut() 384 { 385 // Arno, 2013-02-26: Can only send HAVEs covered by signed peaks 386 // At this point in time, peaks == signed peaks 387 LiveHashTree *umt = (LiveHashTree *)hashtree(); 388 389 signed_ack_out_.clear(); 390 for (int i=0; i<umt->peak_count(); i++) { 391 bin_t sigpeak = umt->peak(i); 392 signed_ack_out_.set(sigpeak); 393 394 //fprintf(stderr,"live: AddData: UMT: DOHAVE %s %s %s\n", sigpeak.str().c_str(), sigpeak.base_left().str().c_str(), sigpeak.base_right().str().c_str() ); 395 } 396 // LIVECHECKPOINT, see constructor 397 // SIGNMUNRO 398 if (checkpoint_bin_ != bin_t::NONE) { 399 for (int i=0; i<=checkpoint_bin_.layer_offset(); i++) { 400 bin_t clearbin(checkpoint_bin_.layer(),i); 401 signed_ack_out_.reset(clearbin); 402 403 //fprintf(stderr,"live: AddData: UMT: UNHAVE %s %s %s\n", clearbin.str().c_str(), clearbin.base_left().str().c_str(), clearbin.base_right().str().c_str() ); 404 } 405 // TEST 406 /* fprintf(stderr,"live: AddData: UMT: checkp %s\n", checkpoint_bin_.str().c_str() ); 407 bin_t bf = signed_ack_out_.find_filled(); 408 bin_t be = signed_ack_out_.find_empty(); 409 bin_t sbe = signed_ack_out_.find_empty(checkpoint_bin_); 410 411 fprintf(stderr,"live: AddData: UMT: bf %s be %s sbe %s\n", bf.str().c_str(), be.str().c_str(), sbe.str().c_str() ); 412 */ 413 } 414 } 415 416 417 418 void LiveTransfer::UpdateOperational() 419 { 420 } 421 422 423 binmap_t *LiveTransfer::ack_out_signed() 424 { 425 if (!am_source_ || hashtree() == NULL) 426 return &ack_out_; 427 428 // Arno, 2013-02-26: Cannot send HAVEs not covered by signed peak 429 return &signed_ack_out_; 430 } 431 432 binmap_t *LiveTransfer::ack_out() 433 { 434 if (GetDefaultHandshake().cont_int_prot_ == POPT_CONT_INT_PROT_UNIFIED_MERKLE) 435 return hashtree_->ack_out(); 436 else 437 return &ack_out_; // tree less, use local binmap. 438 } 439 440 441 void LiveTransfer::OnVerifiedMunroHash(bin_t munro, tint sourcet) 442 { 443 // Channel sendc received a correctly signed munro. 444 LiveHashTree *umt = (LiveHashTree *)hashtree(); 445 if (umt != NULL) 446 umt->SetNChunksPerSig(munro.base_length()); 447 448 // Arno, 2013-05-22: Hook-in using signed peaks in UMT. 449 LivePiecePicker *lpp = (LivePiecePicker *)picker_; 450 lpp->AddPeerMunro(munro, sourcet); 451 } 452 453 454 void LiveTransfer::OnDataPruneTree(Handshake &hs_out, bin_t pos, uint32_t nchunks2forget) 455 { 456 if (nchunks2forget < 1) // nchunks_per_sig_ unknown 457 return; 458 459 if (ack_out_right_basebin_ == bin_t::NONE || pos > ack_out_right_basebin_) 460 ack_out_right_basebin_ = pos; 461 else 462 return; // Don't prune if no change 463 464 uint64_t lastchunkid = ack_out_right_basebin_.layer_offset(); 465 466 int64_t oldcid = ((int64_t)lastchunkid - (int64_t)hs_out.live_disc_wnd_); 467 if (oldcid > 0) { 468 // Find subtree left of window with width nchunks2forget that can be pruned 469 uint64_t extracid = oldcid % nchunks2forget; 470 uint64_t startcid = oldcid - extracid; 471 int64_t leftcid = ((int64_t)startcid - (int64_t)nchunks2forget); 472 if (leftcid >= 0) { 473 bin_t leftpos(0,leftcid); 474 475 bin_t::uint_t nchunks_per_sign_layer = (bin_t::uint_t)log2((double)nchunks2forget); 476 for (int h=0; h<nchunks_per_sign_layer; h++) { 477 leftpos = leftpos.parent(); 478 } 479 480 // Find biggest subtree to remove 481 if (leftpos.is_right()) { 482 while (leftpos.parent().is_right()) { 483 leftpos = leftpos.parent(); 484 } 485 } 486 //fprintf(stderr,"live: OnDataPruneTree: prune %s log %lf nchunks %d window %" PRIu64 " when %" PRIu64 "\n", leftpos.str().c_str(), log2((double)lastchunkid), nchunks2forget, hs_out.live_disc_wnd_, lastchunkid ); 487 LiveHashTree *umt = (LiveHashTree *)hashtree(); 488 umt->PruneTree(leftpos); 489 } 490 } 491 } 492 493 494 int LiveTransfer::WriteCheckpoint(BinHashSigTuple &munrotup) 495 { 496 // FORMAT: (layer,layeroff) munrohash-in-hex timestamp munrosig-in-hex\n 497 char tscstr[256]; 498 sprintf(tscstr,"%" PRIi64 "",munrotup.sigtint().time()); 499 std::string s = munrotup.bin().str()+" "+munrotup.hash().hex()+" "+std::string(tscstr)+" " 500 +munrotup.sigtint().sig().hex()+"\n"; 501 char *cstr = new char[strlen(s.c_str())+1]; 502 503 strcpy(cstr,s.c_str()); 504 505 // TODO: atomic? 506 int fd = open_utf8(checkpoint_filename_.c_str(),OPENFLAGS,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); 507 if (fd < 0) { 508 print_error("could not write checkpoint file"); 509 delete cstr; 510 return fd; 511 } 512 int ret = write(fd,cstr,strlen(cstr)); 513 if (ret < 0) { 514 print_error("could not write live checkpoint data"); 515 delete cstr; 516 return ret; 517 } 518 delete cstr; 519 ret = close(fd); 520 521 return ret; 522 } 523 524 525 BinHashSigTuple LiveTransfer::ReadCheckpoint() 526 { 527 // TODO: atomic? 528 int fd = open_utf8(checkpoint_filename_.c_str(),ROOPENFLAGS,S_IRUSR|S_IWUSR|S_IRGRP|S_IROTH); 529 if (fd < 0) { 530 print_error("could not read live checkpoint file"); 531 return BinHashSigTuple::NOBULL; 532 } 533 char buffer[1024]; 534 int ret = read(fd,buffer,1024); 535 if (ret < 0) { 536 print_error("could not read live checkpoint data"); 537 return BinHashSigTuple::NOBULL; 538 } 539 close(fd); 540 541 // Overwrite \n with end-of-string 542 buffer[ret-1] = '\0'; 543 std::string pstr(buffer); 544 545 std::string binstr; 546 std::string hashstr; 547 std::string timestr; 548 std::string sigstr; 549 int sidx = pstr.find(" "); 550 if (sidx == std::string::npos) { 551 print_error("could not parsing live checkpoint: no bin"); 552 return BinHashSigTuple::NOBULL; 553 } else { 554 binstr = pstr.substr(0,sidx); 555 int midx = pstr.find(" ",sidx+1); 556 if (midx == std::string::npos) { 557 print_error("could not parsing live checkpoint: no hash"); 558 return BinHashSigTuple::NOBULL; 559 } else { 560 hashstr = pstr.substr(sidx+1,midx-sidx-1); 561 int m2idx = pstr.find(" ",midx+1); 562 if (m2idx == std::string::npos) { 563 print_error("could not parsing live checkpoint: no timestamp"); 564 return BinHashSigTuple::NOBULL; 565 } else { 566 timestr = pstr.substr(midx+1,m2idx-midx-1); 567 sigstr = pstr.substr(m2idx+1); 568 } 569 } 570 } 571 572 sidx = binstr.find(","); 573 if (sidx == std::string::npos) { 574 print_error("could not parsing live checkpoint: bin bad"); 575 return BinHashSigTuple::NOBULL; 576 } 577 578 579 //fprintf(stderr,"CHECKPOINT read <%s> <%s> <%s> <%s>\n", binstr.c_str(), hashstr.c_str(), timestr.c_str(), sigstr.c_str() ); 580 581 std::string layerstr=binstr.substr(1,sidx-1); 582 std::string layeroffstr=binstr.substr(sidx+1,binstr.length()-sidx-2); 583 584 int layer; 585 ret = sscanf(layerstr.c_str(),"%d",&layer); 586 if (ret != 1) { 587 print_error("could not parsing live checkpoint: bin layer bad"); 588 return BinHashSigTuple::NOBULL; 589 } 590 bin_t::uint_t layeroff; 591 ret = sscanf(layeroffstr.c_str(),"%llu",&layeroff); 592 if (ret != 1) { 593 print_error("could not parsing live checkpoint: bin layer off bad"); 594 return BinHashSigTuple::NOBULL; 595 } 596 597 598 tint munrotimestamp; 599 ret = sscanf(timestr.c_str(),"%" SCNi64 "",&munrotimestamp); 600 if (ret != 1) { 601 print_error("could not parsing live checkpoint: timestamp bad"); 602 return BinHashSigTuple::NOBULL; 603 } 604 605 606 bin_t munrobin(layer,layeroff); 607 Sha1Hash munrohash = Sha1Hash(true,hashstr.c_str()); 608 Signature munrosig = Signature(true,(const uint8_t *)sigstr.c_str(),(uint16_t)sigstr.length()); 609 SigTintTuple munrost = SigTintTuple(munrosig,munrotimestamp); 610 611 //fprintf(stderr,"CHECKPOINT parsed <%s> %d %" SCNu64 " <%s> <%s> %" PRIi64 " <%s>\n", munrobin.str().c_str(), layer, layeroff, munrohash.hex().c_str(), timestr.c_str(), munrotimestamp, munrosig.hex().c_str() ); 612 613 return BinHashSigTuple(munrobin,munrohash,munrost); 614 } 615 616 617 bin_t LiveTransfer::GetSourceCurrentPos() 618 { 619 return bin_t(0,last_chunkid_); 620 } 621 622 623 /* 624 * Channel extensions for live 625 */ 626 627 void Channel::LiveSend() 628 { 629 //fprintf(stderr,"live: LiveSend: channel %d\n", id() ); 630 631 if (evsendlive_ptr_ == NULL) { 632 evsendlive_ptr_ = new struct event; 633 // Arno, 2013-02-01: Don't reassign, causes crashes. 634 evtimer_assign(evsendlive_ptr_,evbase,&Channel::LibeventSendCallback,this); 635 } 636 //fprintf(stderr,"live: LiveSend: next %" PRIi64 "\n", next_send_time_ ); 637 evtimer_add(evsendlive_ptr_,tint2tv(next_send_time_)); 638 } 639 640