1# Written by Arno Bakker 2# see LICENSE.txt for license information 3 4import unittest 5 6import os 7import sys 8import tempfile 9import random 10import shutil 11import time 12import subprocess 13import urllib2 14import string 15import binascii 16from traceback import print_exc 17from sha import sha 18 19from activatetest import TestDirSeedFramework 20from testasserver import TestAsServer 21from SwiftDef import SwiftDef 22from swiftconn import * 23 24DEBUG=False 25 26CHUNKSIZE=1024 27EMPTYHASH='\x00' * 20 28 29def crtup_cmp(x,y): 30 a = x[0] 31 b = y[0] 32 #print >>sys.stderr,"crtup_cmp:",a,b 33 34 if a[0] <= b[0] and b[1] <= a[1]: # b contained in a 35 return 1 # b smaller 36 if b[0] <= a[0] and a[1] <= b[1]: # a contained in b 37 return -1 # a smaller 38 if a[1] <= b[0]: 39 return -1 # a ends before b starts 40 if b[1] <= a[0]: 41 return 1 # b ends before a starts 42 if a == b: 43 return 0 44 else: 45 print >>sys.stderr,"\n\nNOT YET IMPLEMENTED crtup_cmp a b",a,b 46 47 48def check_hashes(hashdict,unclelist): 49 hash = None 50 pair = [ [unclelist[0],hashdict[unclelist[0]]], [unclelist[1],hashdict[unclelist[1]]] ] 51 i = 2 52 while True: 53 # order left right 54 pair.sort(cmp=crtup_cmp) 55 # calc root hash of parent 56 #print >>sys.stderr,"Comparing",pair[0][0],pair[1][0] 57 hash = sha(pair[0][1]+pair[1][1]).digest() 58 # calc chunkspec of parent 59 crtup = (pair[0][0][0],pair[1][0][1]) 60 #print >>sys.stderr,"Parent",crtup 61 parent = [crtup,hash] 62 # Add to hashdict, sender will expect this. 63 hashdict[crtup] = hash 64 # repeat recursive hash with parent and its sibling 65 if i >= len(unclelist): 66 break 67 pair = [parent,[unclelist[i],hashdict[unclelist[i]]]] 68 i += 1 69 return hash 70 71 72def check_peak_hashes(hashdict,peaklist): 73 righthash = EMPTYHASH 74 i = 0 75 # Build up hash tree starting from lowest peak hash, combining it 76 # with a right-side empty hash until it has the same size tree as 77 # covered by the next peak, until we have combined with the last peak, 78 # in which case the top hash should be the root hash. 79 # 80 lefthash = hashdict[peaklist[i]] 81 gotwidth = peaklist[i][1] - peaklist[i][0] +1 82 while True: 83 hash = sha(lefthash+righthash).digest() 84 gotwidth *= 2 85 if i == len(peaklist)-1: 86 break 87 wantwidth = peaklist[i+1][1] - peaklist[i+1][0] +1 88 if gotwidth >= wantwidth: 89 # Our tree is now as big as the next available peak, 90 # so we can combine those 91 i += 1 92 lefthash = hashdict[peaklist[i]] 93 righthash = hash 94 else: 95 # Our tree still small, increase by assuming all empty 96 # hashes on the right side 97 lefthash = hash 98 righthash = EMPTYHASH 99 return hash 100 101 102 103 104class TestZeroSeedFramework(TestAsServer): 105 """ 106 Framework for seeding tests using zero-state transfers. Copy of 107 activatetest.py/TestDirSeedFramework 108 """ 109 110 def setUpPreSession(self): 111 TestAsServer.setUpPreSession(self) 112 self.httpport = None 113 self.zerosdir = os.path.join(os.getcwd(),"zeros2") 114 self.exitwait = 36 115 self.progress = True 116 117 self.setUpZerosDir() 118 119 try: 120 shutil.rmtree(self.zerosdir) 121 except: 122 pass 123 os.mkdir(self.zerosdir) 124 125 # Create content 126 for i in range(len(self.filelist)): 127 fn = self.filelist[i][0] 128 s = self.filelist[i][1] 129 f = open(fn,"wb") 130 data = '#' * s 131 f.write(data) 132 f.close() 133 134 # Pre hash check and checkpoint them 135 sdef = SwiftDef() 136 sdef.add_content(fn) 137 sdef.finalize(self.binpath) 138 self.filelist[i][2] = sdef.get_id() # save roothash 139 140 # Hack: now copy to <roothash>[.mhash|.mbinmap] files such that zerostate works 141 for i in range(len(self.filelist)): 142 print >>sys.stderr,"test: Configuring",binascii.hexlify(self.filelist[i][2]) 143 srcfile = self.filelist[i][0] 144 dstfile = os.path.join(self.zerosdir,binascii.hexlify(self.filelist[i][2])) 145 shutil.copyfile(srcfile,dstfile) 146 shutil.copyfile(srcfile+".mhash",dstfile+".mhash") 147 shutil.copyfile(srcfile+".mbinmap",dstfile+".mbinmap") 148 149 150 def setUpZerosDir(self): 151 self.filelist = [] 152 # Minimum 1 entry 153 # DO NOT MODIFY THESE ENTRIES without adjusting tests, e.g. requesttest.py 154 self.filelist.append([os.path.join(self.zerosdir,"anita.ts"), 1234, None]) 155 self.filelist.append([os.path.join(self.zerosdir,"bill.ts"), 200487, None]) 156 self.filelist.append([os.path.join(self.zerosdir,"claire.ts"),65535, None]) 157 158 def setUpPostSession(self): 159 TestAsServer.setUpPostSession(self) 160 161 162 def tearDown(self): 163 # Wait 30+ second such that zerostate cleanup code gets covered. 164 time.sleep(self.exitwait) 165 166 TestAsServer.tearDown(self) 167 try: 168 shutil.rmtree(self.zerosdir) 169 except: 170 print_exc() 171 172 173 174class TestRequestFramework: # subclassed below 175 """ 176 Framework of tests doing REQUESTs (and CANCELs) to a swift process. 177 """ 178 179 def tst_request_one(self): 180 myaddr = ("127.0.0.1",15357) 181 hisaddr = ("127.0.0.1",self.listenport) 182 183 # Request from claire.ts 184 fidx = len(self.filelist)-1 185 swarmid = self.filelist[fidx][2] 186 187 s = SwiftConnection(myaddr,hisaddr,swarmid) 188 189 d = s.recv() 190 s.c.recv(d) 191 192 # Request single chunk DATA 193 d = s.makeDatagram() 194 #killer = ChannelID.from_bytes('\x05\xacH\xa0') 195 #d.add( killer ) 196 d.add( RequestMessage(ChunkRange(0,0)) ) 197 s.send(d) 198 199 # Recv DATA 200 print >>sys.stderr,"test: Waiting for response" 201 time.sleep(1) 202 203 # clair.ts is 64K exactly 204 peaklist = [(0,63)] 205 # Uncles for chunk 0. MUST be sorted in the uncle order 206 unclelist = [(1,1),(2,3),(4,7),(8,15),(16,31),(32,63)] 207 expunclelist = peaklist + unclelist 208 hashdict = {} 209 d = s.recv() 210 while True: 211 msg = d.get_message() 212 if msg is None: 213 break 214 print >>sys.stderr,"test: Got",`msg` 215 if msg.get_id() == MSG_ID_INTEGRITY: 216 crtup = (msg.chunkspec.s,msg.chunkspec.e) 217 if crtup in expunclelist: # test later 218 expunclelist.remove(crtup) 219 hashdict[crtup] = msg.intbytes 220 if msg.get_id() == MSG_ID_DATA: 221 self.assertEquals(ChunkRange(0,0).to_bytes(),msg.chunkspec.to_bytes()) 222 filename = self.filelist[fidx][0] 223 f = open(filename,"rb") 224 225 expchunk = f.read(CHUNKSIZE) 226 f.close() 227 self.assertEquals(expchunk,msg.chunk) 228 hash = sha(expchunk).digest() 229 hashdict[(0,0)] = hash 230 231 # See if we got necessary peak + uncle hashes 232 self.assertEquals([],expunclelist) 233 234 # Check peak hashes 235 self.assertEquals(hashdict[peaklist[0]],swarmid) 236 237 # See if they add up to the root hash 238 gothash = check_hashes(hashdict,[(0,0)]+unclelist) 239 self.assertEquals(swarmid,gothash) 240 241 # Send Ack + explicit close 242 d = s.makeDatagram() 243 d.add( AckMessage(ChunkRange(0,0),TimeStamp(1234L)) ) 244 d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) ) 245 s.c.send(d) 246 247 248 249 250 def tst_request_one_middle(self): 251 myaddr = ("127.0.0.1",5354) 252 hisaddr = ("127.0.0.1",self.listenport) 253 254 # Request from bill.ts 255 fidx = 1 256 swarmid = self.filelist[fidx][2] 257 # bill.ts is 195.788 chunks, 3 peaks [0,127], ... 258 # MUST be sorted low to high level 259 peaklist = [(192,195),(128,191),(0,127)] 260 261 262 s = SwiftConnection(myaddr,hisaddr,swarmid) 263 264 d = s.recv() 265 s.c.recv(d) 266 267 # Request DATA 268 d = s.makeDatagram() 269 d.add( RequestMessage(ChunkRange(67,67)) ) 270 s.c.send(d) 271 272 # Recv hashes and chunk 67 273 self.get_bill_67(s,fidx,swarmid,peaklist) 274 275 # Send Ack + explicit close 276 d = s.makeDatagram() 277 d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) ) 278 d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) ) 279 s.c.send(d) 280 281 282 283 def get_bill_67(self,s,fidx,swarmid,peaklist): 284 285 # Uncles for chunk 67. MUST be sorted in the uncle order 286 unclelist = [(66,66),(64,65),(68,71),(72,79),(80,95),(96,127),(0,63)] 287 expunclelist = peaklist + unclelist 288 hashdict = {} 289 290 # Recv DATA 291 print >>sys.stderr,"test: Waiting for response" 292 d = s.recv() 293 294 while True: 295 msg = d.get_message() 296 if msg is None: 297 break 298 print >>sys.stderr,"test: Got",`msg` 299 if msg.get_id() == MSG_ID_INTEGRITY: 300 crtup = (msg.chunkspec.s,msg.chunkspec.e) 301 if crtup in expunclelist: # test later 302 expunclelist.remove(crtup) 303 hashdict[crtup] = msg.intbytes 304 if msg.get_id() == MSG_ID_DATA: 305 self.assertEquals(ChunkRange(67,67).to_bytes(),msg.chunkspec.to_bytes()) 306 filename = self.filelist[fidx][0] 307 f = open(filename,"rb") 308 expchunk = f.read(CHUNKSIZE) 309 f.close() 310 self.assertEquals(expchunk,msg.chunk) 311 hash = sha(expchunk).digest() 312 hashdict[(67,67)] = hash 313 314 # See if we got necessary peak + uncle hashes 315 self.assertEquals([],expunclelist) 316 317 # Check peak hashes against root hash 318 gothash = check_peak_hashes(hashdict,peaklist) 319 self.assertEquals(swarmid,gothash) 320 321 # See if they add up to the covering peak hash, now that they are OK. 322 gothash = check_hashes(hashdict,[(67,67)]+unclelist) 323 exphash = hashdict[peaklist[len(peaklist)-1]] 324 self.assertEquals(exphash,gothash) 325 326 return hashdict 327 328 def tst_request_two(self): 329 330 print >>sys.stderr,"test: test_request_two" 331 332 myaddr = ("127.0.0.1",5356) 333 hisaddr = ("127.0.0.1",self.listenport) 334 335 # Request from bill.ts 336 fidx = 1 337 swarmid = self.filelist[fidx][2] 338 # bill.ts is 195.788 chunks, 3 peaks [0,127], ... 339 # MUST be sorted low to high level 340 peaklist = [(192,195),(128,191),(0,127)] 341 342 s = SwiftConnection(myaddr,hisaddr,swarmid) 343 344 d = s.recv() 345 s.c.recv(d) 346 347 # Request 2 chunks of DATA 348 d = s.makeDatagram() 349 d.add( RequestMessage(ChunkRange(67,68)) ) # ask 2 chunks 350 s.c.send(d) 351 352 # Recv hashes and chunk 67 353 hashdict = self.get_bill_67(s,fidx,swarmid,peaklist) # SHOULD process sequentially 354 355 # Send Ack 67 356 d = s.makeDatagram() 357 d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) ) 358 s.c.send(d) 359 360 # Recv hashes and chunk 68 361 self.get_bill_68(s,fidx,swarmid,peaklist,hashdict) # SHOULD process sequentially 362 363 # Send Ack + explicit close 364 d = s.makeDatagram() 365 d.add( AckMessage(ChunkRange(67,68),TimeStamp(1234L)) ) 366 d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) ) 367 s.c.send(d) 368 369 370 371 def get_bill_68(self,s,fidx,swarmid,peaklist,hashdict): 372 373 # bill.ts is 195.788 chunks, 3 peaks [0,127], ... 374 # Uncles for chunk 67. MUST be sorted in the uncle order 375 realunclelist = [(69,69),(70,71),(64,67),(72,79),(80,95),(96,127),(0,63)] 376 recvunclelist = [(69,69),(70,71)] # already known is [(64,67),(72,79),(80,95),(96,127),(0,63)] 377 expunclelist = recvunclelist # peaklist already sent before 378 379 # Recv DATA 380 print >>sys.stderr,"test: Waiting for response" 381 d = s.recv() 382 while True: 383 msg = d.get_message() 384 if msg is None: 385 break 386 print >>sys.stderr,"test: Got",`msg` 387 if msg.get_id() == MSG_ID_INTEGRITY: 388 crtup = (msg.chunkspec.s,msg.chunkspec.e) 389 if crtup in expunclelist: # test later 390 expunclelist.remove(crtup) 391 hashdict[crtup] = msg.intbytes 392 if msg.get_id() == MSG_ID_DATA: 393 self.assertEquals(ChunkRange(68,68).to_bytes(),msg.chunkspec.to_bytes()) 394 filename = self.filelist[fidx][0] 395 f = open(filename,"rb") 396 expchunk = f.read(CHUNKSIZE) 397 f.close() 398 self.assertEquals(expchunk,msg.chunk) 399 hash = sha(expchunk).digest() 400 hashdict[(68,68)] = hash 401 402 # See if we got necessary peak + uncle hashes 403 self.assertEquals([],expunclelist) 404 405 # See if they add up to the covering peak hash, now that they are OK. 406 407 gothash = check_hashes(hashdict,[(68,68)]+realunclelist) 408 exphash = hashdict[peaklist[len(peaklist)-1]] 409 self.assertEquals(exphash,gothash) 410 411 412 def tst_request_two_cancel_2nd(self): 413 myaddr = ("127.0.0.1",5356) 414 hisaddr = ("127.0.0.1",self.listenport) 415 416 # Request from bill.ts 417 fidx = 1 418 swarmid = self.filelist[fidx][2] 419 # bill.ts is 195.788 chunks, 3 peaks [0,127], ... 420 # MUST be sorted low to high level 421 peaklist = [(192,195),(128,191),(0,127)] 422 423 s = SwiftConnection(myaddr,hisaddr,swarmid) 424 425 d = s.recv() 426 s.c.recv(d) 427 428 # Request DATA 429 d = s.makeDatagram() 430 d.add( RequestMessage(ChunkRange(67,68)) ) # ask 2 chunks 431 s.c.send(d) 432 433 434 # Cancel 68 435 d = s.makeDatagram() 436 d.add( CancelMessage(ChunkRange(68,68)) ) # cancel 1 chunk 437 s.c.send(d) 438 439 # Recv hashes and chunk 67 440 hashdict = self.get_bill_67(s,fidx,swarmid,peaklist) # SHOULD process sequentially 441 442 # Send Ack 67 443 d = s.makeDatagram() 444 d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) ) 445 s.c.send(d) 446 447 # Now we shouldn't get 68 448 gotdata = False 449 d = s.recv() 450 while True: 451 msg = d.get_message() 452 if msg is None: 453 break 454 print >>sys.stderr,"test: Got",`msg` 455 if msg.get_id() == MSG_ID_DATA: 456 self.assertEquals(ChunkRange(68,68).to_bytes(),msg.chunkspec.to_bytes()) 457 gotdata = True 458 459 self.assertFalse(gotdata) 460 461 # Send explicit close 462 d = s.makeDatagram() 463 d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) ) 464 s.c.send(d) 465 466 467 def tst_request_many_cancel_some1(self): 468 469 print >>sys.stderr,"test: test_request_many_cancel_some1" 470 471 # Request from bill.ts 472 fidx = 1 473 474 # Ask many chunks 475 reqcp = ChunkRange(67,100) 476 477 # Cancel some chunks 478 cancelcplist = [] 479 cancelcplist.append(ChunkRange(69,69)) 480 cancelcplist.append(ChunkRange(75,78)) 481 cancelcplist.append(ChunkRange(80,80)) 482 cancelcplist.append(ChunkRange(96,99)) 483 484 # What chunks still expected 485 expcplist = [] 486 expcplist += [ChunkRange(i,i) for i in range(67,69)] 487 expcplist += [ChunkRange(i,i) for i in range(70,75)] 488 expcplist += [ChunkRange(i,i) for i in range(79,80)] 489 expcplist += [ChunkRange(i,i) for i in range(81,96)] 490 expcplist += [ChunkRange(100,100)] 491 492 self.do_request_many_cancel_some(fidx, reqcp, cancelcplist, expcplist) 493 494 495 def tst_request_many_cancel_some2(self): 496 497 print >>sys.stderr,"test: test_request_many_cancel_some2" 498 499 # Request from clair.ts 500 fidx = 2 501 502 # Ask many chunks 503 reqcp = ChunkRange(0,30) 504 505 # Cancel some chunks 506 cancelcplist = [] 507 cancelcplist.append(ChunkRange(4,5)) 508 cancelcplist.append(ChunkRange(8,12)) 509 cancelcplist.append(ChunkRange(16,22)) 510 cancelcplist.append(ChunkRange(24,27)) 511 cancelcplist.append(ChunkRange(30,30)) 512 513 # What chunks still expected 514 expcplist = [] 515 expcplist += [ChunkRange(i,i) for i in range( 0,3+1)] 516 expcplist += [ChunkRange(i,i) for i in range( 6,7+1)] 517 expcplist += [ChunkRange(i,i) for i in range(13,15+1)] 518 expcplist += [ChunkRange(i,i) for i in range(23,23+1)] 519 expcplist += [ChunkRange(i,i) for i in range(28,29+1)] 520 521 self.do_request_many_cancel_some(fidx, reqcp, cancelcplist, expcplist) 522 523 524 525 def do_request_many_cancel_some(self,fidx,reqcp,cancelcplist,expcplist): 526 myaddr = ("127.0.0.1",5356) 527 hisaddr = ("127.0.0.1",self.listenport) 528 529 # Request from fidx 530 swarmid = self.filelist[fidx][2] 531 532 s = SwiftConnection(myaddr,hisaddr,swarmid) 533 534 d = s.recv() 535 s.c.recv(d) 536 537 # Request range of chunks 538 d = s.makeDatagram() 539 d.add( RequestMessage(reqcp) ) # ask many chunks 540 s.send(d) 541 542 543 # Cancel some chunks 544 d = s.makeDatagram() 545 for cp in cancelcplist: 546 d.add( CancelMessage(cp) ) 547 s.send(d) 548 549 # Receive uncanceled chunks 550 cidx = 0 551 for attempt in range(0,reqcp.e - reqcp.s): 552 d = s.recv() 553 while True: 554 expchunkspec = expcplist[cidx] 555 msg = d.get_message() 556 if msg is None: 557 break 558 print >>sys.stderr,"test: Wait for uncanceled, Got",`msg` 559 if msg.get_id() == MSG_ID_DATA: 560 self.assertEquals(expchunkspec.to_bytes(),msg.chunkspec.to_bytes()) 561 cidx += 1 562 563 # Send ACK 564 d = s.makeDatagram() 565 d.add( AckMessage(expchunkspec,TimeStamp(1234L)) ) 566 s.send(d) 567 print >>sys.stderr,"test: ACK",expchunkspec 568 break 569 if cidx == len(expcplist): 570 break 571 572 # Should only receive non-CANCELed chunks. 573 self.assertEquals(cidx,len(expcplist)) 574 575 # Send explicit close 576 d = s.makeDatagram() 577 d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) ) 578 s.send(d) 579 580 581class TestRequestFullState(TestDirSeedFramework,TestRequestFramework): 582 """ 583 Do request test on normal swarms, activated in memory. 584 """ 585 def test_request_one(self): 586 return self.tst_request_one() 587 588 def test_request_one_middle(self): 589 return self.tst_request_one_middle() 590 591 def test_request_two(self): 592 return self.tst_request_two() 593 594 def test_request_two_cancel_2nd(self): 595 return self.tst_request_two_cancel_2nd() 596 597 def test_request_many_cancel_some1(self): 598 return self.tst_request_many_cancel_some1() 599 600 def test_request_many_cancel_some2(self): 601 return self.tst_request_many_cancel_some2() 602 603 604class TestRequestZeroState(TestZeroSeedFramework,TestRequestFramework): 605 """ 606 Do same request test on zero-state swarms, where hashes 607 are served directly from disk. 608 """ 609 def test_request_one(self): 610 return self.tst_request_one() 611 612 def test_request_one_middle(self): 613 return self.tst_request_one_middle() 614 615 def test_request_two(self): 616 return self.tst_request_two() 617 618 def test_request_two_cancel_2nd(self): 619 return self.tst_request_two_cancel_2nd() 620 621 def test_request_many_cancel_some1(self): 622 return self.tst_request_many_cancel_some1() 623 624 def test_request_many_cancel_some2(self): 625 return self.tst_request_many_cancel_some2() 626 627 628 629 630def test_suite(): 631 suite = unittest.TestSuite() 632 suite.addTest(unittest.makeSuite(TestRequestFullState)) 633 suite.addTest(unittest.makeSuite(TestRequestZeroState)) 634 635 return suite 636 637 638def main(): 639 unittest.main(defaultTest='test_suite',argv=[sys.argv[0]]) 640 641if __name__ == "__main__": 642 main() 643 644