1#!/usr/bin/env python
2# vim:ts=4:sw=4:et:
3import inspect
4import unittest
5import os
6from pywatchman import bser, pybser, SocketTimeout, WatchmanError
7
8
9class TestSocketTimeout(unittest.TestCase):
10    def test_exception_handling(self):
11        try:
12            raise SocketTimeout('should not raise')
13        except WatchmanError:
14            pass
15
16
17class TestBSERDump(unittest.TestCase):
18    # bser_mod will be None during discovery
19    def __init__(self, method_name, bser_mod=None):
20        super(TestBSERDump, self).__init__(method_name)
21        if bser_mod:
22            self._test_name = '%s.%s [%s]' % (
23                self.__class__.__name__, method_name, bser_mod.__name__)
24        else:
25            self._test_name = None
26        self.bser_mod = bser_mod
27
28    @staticmethod
29    def parameterize(loader, bser_mod):
30        suite = unittest.TestSuite()
31        for method_name in loader.getTestCaseNames(TestBSERDump):
32            suite.addTest(TestBSERDump(method_name, bser_mod))
33        return suite
34
35    def id(self):
36        if self._test_name:
37            return self._test_name
38        else:
39            return super(TestBSERDump, self).id()
40
41    def roundtrip(self, val, mutable=True):
42        enc = self.bser_mod.dumps(val)
43        print "# %s  -->  %s" % (val, enc.encode('hex'))
44        dec = self.bser_mod.loads(enc, mutable)
45        self.assertEquals(val, dec)
46
47    def munged(self, val, munged):
48        enc = self.bser_mod.dumps(val)
49        if isinstance(val, unicode):
50            print "# %s  -->  %s" % (val.encode('utf-8'), enc.encode('hex'))
51        else:
52            print "# %s  -->  %s" % (val, enc.encode('hex'))
53        dec = self.bser_mod.loads(enc)
54        self.assertEquals(munged, dec)
55
56    def test_int(self):
57        self.roundtrip(1)
58        self.roundtrip(0x100)
59        self.roundtrip(0x10000)
60        self.roundtrip(0x10000000)
61        self.roundtrip(0x1000000000)
62
63    def test_negative_int(self):
64        self.roundtrip(-0x80)
65        self.roundtrip(-0x8000)
66        self.roundtrip(-0x80000000)
67        self.roundtrip(-0x8000000000000000L)
68
69    def test_float(self):
70        self.roundtrip(1.5)
71
72    def test_bool(self):
73        self.roundtrip(True)
74        self.roundtrip(False)
75
76    def test_none(self):
77        self.roundtrip(None)
78
79    def test_string(self):
80        self.roundtrip("hello")
81        self.roundtrip(u'Hello')
82        ustr = u'\xe4\xf6\xfc'
83        self.munged(ustr, ustr.encode('utf-8'))
84
85    def test_list(self):
86        self.roundtrip([1, 2, 3])
87        self.roundtrip([1, "helo", 2.5, False, None, True, 3])
88
89    def test_tuple(self):
90        self.munged((1, 2, 3), [1, 2, 3])
91        self.roundtrip((1, 2, 3), mutable=False)
92
93    def test_dict(self):
94        self.roundtrip({"hello": "there"})
95        obj = self.bser_mod.loads(self.bser_mod.dumps({"hello": "there"}), False)
96        self.assertEquals(1, len(obj))
97        self.assertEquals('there', obj.hello)
98        self.assertEquals('there', obj['hello'])
99        self.assertEquals('there', obj[0])
100        hello, = obj  # sequence/list assignment
101        self.assertEquals('there', hello)
102
103    def assertItemAttributes(self, dictish, attrish):
104        self.assertEquals(len(dictish), len(attrish))
105        for k, v in dictish.iteritems():
106            self.assertEquals(v, getattr(attrish, k))
107
108    def test_template(self):
109        # since we can't generate the template bser output, here's a
110        # a blob from the C test suite in watchman
111        templ = "\x00\x01\x03\x28" + \
112                "\x0b\x00\x03\x02\x02\x03\x04\x6e\x61\x6d\x65\x02" + \
113                "\x03\x03\x61\x67\x65\x03\x03\x02\x03\x04\x66\x72" + \
114                "\x65\x64\x03\x14\x02\x03\x04\x70\x65\x74\x65\x03" + \
115                "\x1e\x0c\x03\x19"
116        dec = self.bser_mod.loads(templ)
117        exp = [
118            {"name": "fred", "age": 20},
119            {"name": "pete", "age": 30},
120            {"name": None, "age": 25}
121        ]
122        self.assertEquals(exp, dec)
123        res = self.bser_mod.loads(templ, False)
124
125        for i in range(0, len(exp)):
126            self.assertItemAttributes(exp[i], res[i])
127
128    def test_pdu_len(self):
129        enc = self.bser_mod.dumps(1)
130        self.assertEquals(len(enc), self.bser_mod.pdu_len(enc))
131
132        # try a bigger one; prove that we get the correct length
133        # even though we receive just a portion of the complete
134        # data
135        enc = self.bser_mod.dumps([1, 2, 3, "hello there, much larger"])
136        self.assertEquals(len(enc), self.bser_mod.pdu_len(enc[0:7]))
137
138    def test_garbage(self):
139        with self.assertRaises(ValueError):
140            self.bser_mod.loads("\x00\x01\n")
141
142        with self.assertRaises(ValueError):
143            self.bser_mod.loads('\x00\x01\x04\x01\x00\x02')
144
145def load_tests(loader, test_methods=None, pattern=None):
146    suite = unittest.TestSuite()
147    suite.addTests(loader.loadTestsFromTestCase(TestSocketTimeout))
148    for bser_mod in (bser, pybser):
149        suite.addTest(TestBSERDump.parameterize(loader, bser_mod))
150    return suite
151
152if __name__ == '__main__':
153    suite = load_tests(unittest.TestLoader())
154    unittest.TextTestRunner().run(suite)
155