1# Written by Arno Bakker
2# see LICENSE.txt for license information
3
4import unittest
5
6import os
7import sys
8import tempfile
9import random
10import shutil
11import time
12import subprocess
13import urllib2
14import string
15import binascii
16from traceback import print_exc
17from sha import sha
18
19from activatetest import TestDirSeedFramework
20from testasserver import TestAsServer
21from SwiftDef import SwiftDef
22from swiftconn import *
23
24DEBUG=False
25
26CHUNKSIZE=1024
27EMPTYHASH='\x00' * 20
28
29def crtup_cmp(x,y):
30    a = x[0]
31    b = y[0]
32    #print >>sys.stderr,"crtup_cmp:",a,b
33
34    if a[0] <= b[0] and b[1] <= a[1]: # b contained in a
35        return 1 # b smaller
36    if b[0] <= a[0] and a[1] <= b[1]: # a contained in b
37        return -1 # a smaller
38    if a[1] <= b[0]:
39        return -1 # a ends before b starts
40    if b[1] <= a[0]:
41        return 1 # b ends before a starts
42    if a == b:
43        return 0
44    else:
45        print >>sys.stderr,"\n\nNOT YET IMPLEMENTED crtup_cmp a b",a,b
46
47
48def check_hashes(hashdict,unclelist):
49    hash = None
50    pair = [ [unclelist[0],hashdict[unclelist[0]]], [unclelist[1],hashdict[unclelist[1]]] ]
51    i = 2
52    while True:
53        # order left right
54        pair.sort(cmp=crtup_cmp)
55        # calc root hash of parent
56        #print >>sys.stderr,"Comparing",pair[0][0],pair[1][0]
57        hash = sha(pair[0][1]+pair[1][1]).digest()
58        # calc chunkspec of parent
59        crtup = (pair[0][0][0],pair[1][0][1])
60        #print >>sys.stderr,"Parent",crtup
61        parent = [crtup,hash]
62        # Add to hashdict, sender will expect this.
63        hashdict[crtup] = hash
64        # repeat recursive hash with parent and its sibling
65        if i >= len(unclelist):
66            break
67        pair = [parent,[unclelist[i],hashdict[unclelist[i]]]]
68        i += 1
69    return hash
70
71
72def check_peak_hashes(hashdict,peaklist):
73    righthash = EMPTYHASH
74    i = 0
75    # Build up hash tree starting from lowest peak hash, combining it
76    # with a right-side empty hash until it has the same size tree as
77    # covered by the next peak, until we have combined with the last peak,
78    # in which case the top hash should be the root hash.
79    #
80    lefthash = hashdict[peaklist[i]]
81    gotwidth = peaklist[i][1] - peaklist[i][0] +1
82    while True:
83        hash = sha(lefthash+righthash).digest()
84        gotwidth *= 2
85        if i == len(peaklist)-1:
86            break
87        wantwidth = peaklist[i+1][1] - peaklist[i+1][0] +1
88        if gotwidth >= wantwidth:
89            # Our tree is now as big as the next available peak,
90            # so we can combine those
91            i += 1
92            lefthash = hashdict[peaklist[i]]
93            righthash = hash
94        else:
95            # Our tree still small, increase by assuming all empty
96            # hashes on the right side
97            lefthash = hash
98            righthash = EMPTYHASH
99    return hash
100
101
102
103
104class TestZeroSeedFramework(TestAsServer):
105    """
106    Framework for seeding tests using zero-state transfers. Copy of
107    activatetest.py/TestDirSeedFramework
108    """
109
110    def setUpPreSession(self):
111        TestAsServer.setUpPreSession(self)
112        self.httpport = None
113        self.zerosdir = os.path.join(os.getcwd(),"zeros2")
114        self.exitwait = 36
115        self.progress = True
116
117        self.setUpZerosDir()
118
119        try:
120            shutil.rmtree(self.zerosdir)
121        except:
122            pass
123        os.mkdir(self.zerosdir)
124
125        # Create content
126        for i in range(len(self.filelist)):
127            fn = self.filelist[i][0]
128            s = self.filelist[i][1]
129            f = open(fn,"wb")
130            data = '#' * s
131            f.write(data)
132            f.close()
133
134            # Pre hash check and checkpoint them
135            sdef = SwiftDef()
136            sdef.add_content(fn)
137            sdef.finalize(self.binpath)
138            self.filelist[i][2] = sdef.get_id() # save roothash
139
140        # Hack: now copy to <roothash>[.mhash|.mbinmap] files such that zerostate works
141        for i in range(len(self.filelist)):
142            print >>sys.stderr,"test: Configuring",binascii.hexlify(self.filelist[i][2])
143            srcfile = self.filelist[i][0]
144            dstfile = os.path.join(self.zerosdir,binascii.hexlify(self.filelist[i][2]))
145            shutil.copyfile(srcfile,dstfile)
146            shutil.copyfile(srcfile+".mhash",dstfile+".mhash")
147            shutil.copyfile(srcfile+".mbinmap",dstfile+".mbinmap")
148
149
150    def setUpZerosDir(self):
151        self.filelist = []
152        # Minimum 1 entry
153        # DO NOT MODIFY THESE ENTRIES without adjusting tests, e.g. requesttest.py
154        self.filelist.append([os.path.join(self.zerosdir,"anita.ts"), 1234, None])
155        self.filelist.append([os.path.join(self.zerosdir,"bill.ts"),  200487, None])
156        self.filelist.append([os.path.join(self.zerosdir,"claire.ts"),65535, None])
157
158    def setUpPostSession(self):
159        TestAsServer.setUpPostSession(self)
160
161
162    def tearDown(self):
163        # Wait 30+ second such  that zerostate cleanup code gets covered.
164        time.sleep(self.exitwait)
165
166        TestAsServer.tearDown(self)
167        try:
168            shutil.rmtree(self.zerosdir)
169        except:
170            print_exc()
171
172
173
174class TestRequestFramework: # subclassed below
175    """
176    Framework of tests doing REQUESTs (and CANCELs) to a swift process.
177    """
178
179    def tst_request_one(self):
180        myaddr = ("127.0.0.1",15357)
181        hisaddr = ("127.0.0.1",self.listenport)
182
183        # Request from claire.ts
184        fidx = len(self.filelist)-1
185        swarmid = self.filelist[fidx][2]
186
187        s = SwiftConnection(myaddr,hisaddr,swarmid)
188
189        d = s.recv()
190        s.c.recv(d)
191
192        # Request single chunk DATA
193        d = s.makeDatagram()
194        #killer = ChannelID.from_bytes('\x05\xacH\xa0')
195        #d.add( killer )
196        d.add( RequestMessage(ChunkRange(0,0)) )
197        s.send(d)
198
199        # Recv DATA
200        print >>sys.stderr,"test: Waiting for response"
201        time.sleep(1)
202
203        # clair.ts is 64K exactly
204        peaklist = [(0,63)]
205        # Uncles for chunk 0. MUST be sorted in the uncle order
206        unclelist = [(1,1),(2,3),(4,7),(8,15),(16,31),(32,63)]
207        expunclelist = peaklist + unclelist
208        hashdict = {}
209        d = s.recv()
210        while True:
211            msg = d.get_message()
212            if msg is None:
213                break
214            print >>sys.stderr,"test: Got",`msg`
215            if msg.get_id() == MSG_ID_INTEGRITY:
216                crtup = (msg.chunkspec.s,msg.chunkspec.e)
217                if crtup in expunclelist: # test later
218                    expunclelist.remove(crtup)
219                hashdict[crtup] = msg.intbytes
220            if msg.get_id() == MSG_ID_DATA:
221                self.assertEquals(ChunkRange(0,0).to_bytes(),msg.chunkspec.to_bytes())
222                filename = self.filelist[fidx][0]
223                f = open(filename,"rb")
224
225                expchunk = f.read(CHUNKSIZE)
226                f.close()
227                self.assertEquals(expchunk,msg.chunk)
228                hash = sha(expchunk).digest()
229                hashdict[(0,0)] = hash
230
231        # See if we got necessary peak + uncle hashes
232        self.assertEquals([],expunclelist)
233
234        # Check peak hashes
235        self.assertEquals(hashdict[peaklist[0]],swarmid)
236
237        # See if they add up to the root hash
238        gothash = check_hashes(hashdict,[(0,0)]+unclelist)
239        self.assertEquals(swarmid,gothash)
240
241        # Send Ack + explicit close
242        d = s.makeDatagram()
243        d.add( AckMessage(ChunkRange(0,0),TimeStamp(1234L)) )
244        d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) )
245        s.c.send(d)
246
247
248
249
250    def tst_request_one_middle(self):
251        myaddr = ("127.0.0.1",5354)
252        hisaddr = ("127.0.0.1",self.listenport)
253
254        # Request from bill.ts
255        fidx = 1
256        swarmid = self.filelist[fidx][2]
257        # bill.ts is 195.788 chunks, 3 peaks [0,127], ...
258        # MUST be sorted low to high level
259        peaklist = [(192,195),(128,191),(0,127)]
260
261
262        s = SwiftConnection(myaddr,hisaddr,swarmid)
263
264        d = s.recv()
265        s.c.recv(d)
266
267        # Request DATA
268        d = s.makeDatagram()
269        d.add( RequestMessage(ChunkRange(67,67)) )
270        s.c.send(d)
271
272        # Recv hashes and chunk 67
273        self.get_bill_67(s,fidx,swarmid,peaklist)
274
275        # Send Ack + explicit close
276        d = s.makeDatagram()
277        d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) )
278        d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) )
279        s.c.send(d)
280
281
282
283    def get_bill_67(self,s,fidx,swarmid,peaklist):
284
285        # Uncles for chunk 67. MUST be sorted in the uncle order
286        unclelist = [(66,66),(64,65),(68,71),(72,79),(80,95),(96,127),(0,63)]
287        expunclelist = peaklist + unclelist
288        hashdict = {}
289
290        # Recv DATA
291        print >>sys.stderr,"test: Waiting for response"
292        d = s.recv()
293
294        while True:
295            msg = d.get_message()
296            if msg is None:
297                break
298            print >>sys.stderr,"test: Got",`msg`
299            if msg.get_id() == MSG_ID_INTEGRITY:
300                crtup = (msg.chunkspec.s,msg.chunkspec.e)
301                if crtup in expunclelist: # test later
302                    expunclelist.remove(crtup)
303                hashdict[crtup] = msg.intbytes
304            if msg.get_id() == MSG_ID_DATA:
305                self.assertEquals(ChunkRange(67,67).to_bytes(),msg.chunkspec.to_bytes())
306                filename = self.filelist[fidx][0]
307                f = open(filename,"rb")
308                expchunk = f.read(CHUNKSIZE)
309                f.close()
310                self.assertEquals(expchunk,msg.chunk)
311                hash = sha(expchunk).digest()
312                hashdict[(67,67)] = hash
313
314        # See if we got necessary peak + uncle hashes
315        self.assertEquals([],expunclelist)
316
317        # Check peak hashes against root hash
318        gothash = check_peak_hashes(hashdict,peaklist)
319        self.assertEquals(swarmid,gothash)
320
321        # See if they add up to the covering peak hash, now that they are OK.
322        gothash = check_hashes(hashdict,[(67,67)]+unclelist)
323        exphash = hashdict[peaklist[len(peaklist)-1]]
324        self.assertEquals(exphash,gothash)
325
326        return hashdict
327
328    def tst_request_two(self):
329
330        print >>sys.stderr,"test: test_request_two"
331
332        myaddr = ("127.0.0.1",5356)
333        hisaddr = ("127.0.0.1",self.listenport)
334
335        # Request from bill.ts
336        fidx = 1
337        swarmid = self.filelist[fidx][2]
338        # bill.ts is 195.788 chunks, 3 peaks [0,127], ...
339        # MUST be sorted low to high level
340        peaklist = [(192,195),(128,191),(0,127)]
341
342        s = SwiftConnection(myaddr,hisaddr,swarmid)
343
344        d = s.recv()
345        s.c.recv(d)
346
347        # Request 2 chunks of DATA
348        d = s.makeDatagram()
349        d.add( RequestMessage(ChunkRange(67,68)) )  # ask 2 chunks
350        s.c.send(d)
351
352        # Recv hashes and chunk 67
353        hashdict = self.get_bill_67(s,fidx,swarmid,peaklist) # SHOULD process sequentially
354
355        # Send Ack 67
356        d = s.makeDatagram()
357        d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) )
358        s.c.send(d)
359
360        # Recv hashes and chunk 68
361        self.get_bill_68(s,fidx,swarmid,peaklist,hashdict) # SHOULD process sequentially
362
363        # Send Ack + explicit close
364        d = s.makeDatagram()
365        d.add( AckMessage(ChunkRange(67,68),TimeStamp(1234L)) )
366        d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) )
367        s.c.send(d)
368
369
370
371    def get_bill_68(self,s,fidx,swarmid,peaklist,hashdict):
372
373        # bill.ts is 195.788 chunks, 3 peaks [0,127], ...
374        # Uncles for chunk 67. MUST be sorted in the uncle order
375        realunclelist = [(69,69),(70,71),(64,67),(72,79),(80,95),(96,127),(0,63)]
376        recvunclelist = [(69,69),(70,71)]  # already known is [(64,67),(72,79),(80,95),(96,127),(0,63)]
377        expunclelist = recvunclelist # peaklist already sent before
378
379        # Recv DATA
380        print >>sys.stderr,"test: Waiting for response"
381        d = s.recv()
382        while True:
383            msg = d.get_message()
384            if msg is None:
385                break
386            print >>sys.stderr,"test: Got",`msg`
387            if msg.get_id() == MSG_ID_INTEGRITY:
388                crtup = (msg.chunkspec.s,msg.chunkspec.e)
389                if crtup in expunclelist: # test later
390                    expunclelist.remove(crtup)
391                hashdict[crtup] = msg.intbytes
392            if msg.get_id() == MSG_ID_DATA:
393                self.assertEquals(ChunkRange(68,68).to_bytes(),msg.chunkspec.to_bytes())
394                filename = self.filelist[fidx][0]
395                f = open(filename,"rb")
396                expchunk = f.read(CHUNKSIZE)
397                f.close()
398                self.assertEquals(expchunk,msg.chunk)
399                hash = sha(expchunk).digest()
400                hashdict[(68,68)] = hash
401
402        # See if we got necessary peak + uncle hashes
403        self.assertEquals([],expunclelist)
404
405        # See if they add up to the covering peak hash, now that they are OK.
406
407        gothash = check_hashes(hashdict,[(68,68)]+realunclelist)
408        exphash = hashdict[peaklist[len(peaklist)-1]]
409        self.assertEquals(exphash,gothash)
410
411
412    def tst_request_two_cancel_2nd(self):
413        myaddr = ("127.0.0.1",5356)
414        hisaddr = ("127.0.0.1",self.listenport)
415
416        # Request from bill.ts
417        fidx = 1
418        swarmid = self.filelist[fidx][2]
419        # bill.ts is 195.788 chunks, 3 peaks [0,127], ...
420        # MUST be sorted low to high level
421        peaklist = [(192,195),(128,191),(0,127)]
422
423        s = SwiftConnection(myaddr,hisaddr,swarmid)
424
425        d = s.recv()
426        s.c.recv(d)
427
428        # Request DATA
429        d = s.makeDatagram()
430        d.add( RequestMessage(ChunkRange(67,68)) )  # ask 2 chunks
431        s.c.send(d)
432
433
434        # Cancel 68
435        d = s.makeDatagram()
436        d.add( CancelMessage(ChunkRange(68,68)) )  # cancel 1 chunk
437        s.c.send(d)
438
439        # Recv hashes and chunk 67
440        hashdict = self.get_bill_67(s,fidx,swarmid,peaklist) # SHOULD process sequentially
441
442        # Send Ack 67
443        d = s.makeDatagram()
444        d.add( AckMessage(ChunkRange(67,67),TimeStamp(1234L)) )
445        s.c.send(d)
446
447        # Now we shouldn't get 68
448        gotdata = False
449        d = s.recv()
450        while True:
451            msg = d.get_message()
452            if msg is None:
453                break
454            print >>sys.stderr,"test: Got",`msg`
455            if msg.get_id() == MSG_ID_DATA:
456                self.assertEquals(ChunkRange(68,68).to_bytes(),msg.chunkspec.to_bytes())
457                gotdata = True
458
459        self.assertFalse(gotdata)
460
461        # Send explicit close
462        d = s.makeDatagram()
463        d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) )
464        s.c.send(d)
465
466
467    def tst_request_many_cancel_some1(self):
468
469        print >>sys.stderr,"test: test_request_many_cancel_some1"
470
471        # Request from bill.ts
472        fidx = 1
473
474        # Ask many chunks
475        reqcp = ChunkRange(67,100)
476
477        # Cancel some chunks
478        cancelcplist = []
479        cancelcplist.append(ChunkRange(69,69))
480        cancelcplist.append(ChunkRange(75,78))
481        cancelcplist.append(ChunkRange(80,80))
482        cancelcplist.append(ChunkRange(96,99))
483
484        # What chunks still expected
485        expcplist = []
486        expcplist += [ChunkRange(i,i) for i in range(67,69)]
487        expcplist += [ChunkRange(i,i) for i in range(70,75)]
488        expcplist += [ChunkRange(i,i) for i in range(79,80)]
489        expcplist += [ChunkRange(i,i) for i in range(81,96)]
490        expcplist += [ChunkRange(100,100)]
491
492        self.do_request_many_cancel_some(fidx, reqcp, cancelcplist, expcplist)
493
494
495    def tst_request_many_cancel_some2(self):
496
497        print >>sys.stderr,"test: test_request_many_cancel_some2"
498
499        # Request from clair.ts
500        fidx = 2
501
502        # Ask many chunks
503        reqcp = ChunkRange(0,30)
504
505        # Cancel some chunks
506        cancelcplist = []
507        cancelcplist.append(ChunkRange(4,5))
508        cancelcplist.append(ChunkRange(8,12))
509        cancelcplist.append(ChunkRange(16,22))
510        cancelcplist.append(ChunkRange(24,27))
511        cancelcplist.append(ChunkRange(30,30))
512
513        # What chunks still expected
514        expcplist = []
515        expcplist += [ChunkRange(i,i) for i in range( 0,3+1)]
516        expcplist += [ChunkRange(i,i) for i in range( 6,7+1)]
517        expcplist += [ChunkRange(i,i) for i in range(13,15+1)]
518        expcplist += [ChunkRange(i,i) for i in range(23,23+1)]
519        expcplist += [ChunkRange(i,i) for i in range(28,29+1)]
520
521        self.do_request_many_cancel_some(fidx, reqcp, cancelcplist, expcplist)
522
523
524
525    def do_request_many_cancel_some(self,fidx,reqcp,cancelcplist,expcplist):
526        myaddr = ("127.0.0.1",5356)
527        hisaddr = ("127.0.0.1",self.listenport)
528
529        # Request from fidx
530        swarmid = self.filelist[fidx][2]
531
532        s = SwiftConnection(myaddr,hisaddr,swarmid)
533
534        d = s.recv()
535        s.c.recv(d)
536
537        # Request range of chunks
538        d = s.makeDatagram()
539        d.add( RequestMessage(reqcp) )  # ask many chunks
540        s.send(d)
541
542
543        # Cancel some chunks
544        d = s.makeDatagram()
545        for cp in cancelcplist:
546            d.add( CancelMessage(cp) )
547        s.send(d)
548
549        # Receive uncanceled chunks
550        cidx = 0
551        for attempt in range(0,reqcp.e - reqcp.s):
552            d = s.recv()
553            while True:
554                expchunkspec = expcplist[cidx]
555                msg = d.get_message()
556                if msg is None:
557                    break
558                print >>sys.stderr,"test: Wait for uncanceled, Got",`msg`
559                if msg.get_id() == MSG_ID_DATA:
560                    self.assertEquals(expchunkspec.to_bytes(),msg.chunkspec.to_bytes())
561                    cidx += 1
562
563                    # Send ACK
564                    d = s.makeDatagram()
565                    d.add( AckMessage(expchunkspec,TimeStamp(1234L)) )
566                    s.send(d)
567                    print >>sys.stderr,"test: ACK",expchunkspec
568                    break
569            if cidx == len(expcplist):
570                break
571
572        # Should only receive non-CANCELed chunks.
573        self.assertEquals(cidx,len(expcplist))
574
575        # Send explicit close
576        d = s.makeDatagram()
577        d.add( HandshakeMessage(CHAN_ID_ZERO,POPT_VER_PPSP) )
578        s.send(d)
579
580
581class TestRequestFullState(TestDirSeedFramework,TestRequestFramework):
582    """
583    Do request test on normal swarms, activated in memory.
584    """
585    def test_request_one(self):
586        return self.tst_request_one()
587
588    def test_request_one_middle(self):
589        return self.tst_request_one_middle()
590
591    def test_request_two(self):
592        return self.tst_request_two()
593
594    def test_request_two_cancel_2nd(self):
595        return self.tst_request_two_cancel_2nd()
596
597    def test_request_many_cancel_some1(self):
598        return self.tst_request_many_cancel_some1()
599
600    def test_request_many_cancel_some2(self):
601        return self.tst_request_many_cancel_some2()
602
603
604class TestRequestZeroState(TestZeroSeedFramework,TestRequestFramework):
605    """
606    Do same request test on zero-state swarms, where hashes
607    are served directly from disk.
608    """
609    def test_request_one(self):
610        return self.tst_request_one()
611
612    def test_request_one_middle(self):
613        return self.tst_request_one_middle()
614
615    def test_request_two(self):
616        return self.tst_request_two()
617
618    def test_request_two_cancel_2nd(self):
619        return self.tst_request_two_cancel_2nd()
620
621    def test_request_many_cancel_some1(self):
622        return self.tst_request_many_cancel_some1()
623
624    def test_request_many_cancel_some2(self):
625        return self.tst_request_many_cancel_some2()
626
627
628
629
630def test_suite():
631    suite = unittest.TestSuite()
632    suite.addTest(unittest.makeSuite(TestRequestFullState))
633    suite.addTest(unittest.makeSuite(TestRequestZeroState))
634
635    return suite
636
637
638def main():
639    unittest.main(defaultTest='test_suite',argv=[sys.argv[0]])
640
641if __name__ == "__main__":
642    main()
643
644