1###############################################################################
2#  Tested so far:
3#
4#  FWOpenPolicyStore
5#
6#  Not yet:
7#
8# Shouldn't dump errors against a win7
9#
10################################################################################
11
12import unittest
13import ConfigParser
14
15from impacket.dcerpc.v5 import transport, epm, fasp
16from impacket.dcerpc.v5.rpcrt import RPC_C_AUTHN_LEVEL_PKT_INTEGRITY, RPC_C_AUTHN_LEVEL_PKT_PRIVACY, RPC_C_AUTHN_LEVEL_NONE
17from impacket.dcerpc.v5.ndr import NULL
18
19
20class FASPTests(unittest.TestCase):
21    def connect(self):
22        rpctransport = transport.DCERPCTransportFactory(self.stringBinding)
23        if len(self.hashes) > 0:
24            lmhash, nthash = self.hashes.split(':')
25        else:
26            lmhash = ''
27            nthash = ''
28        if hasattr(rpctransport, 'set_credentials'):
29            # This method exists only for selected protocol sequences.
30            rpctransport.set_credentials(self.username,self.password, self.domain, lmhash, nthash)
31        dce = rpctransport.get_dce_rpc()
32        dce.set_auth_level(RPC_C_AUTHN_LEVEL_PKT_PRIVACY)
33        dce.connect()
34        dce.bind(fasp.MSRPC_UUID_FASP, transfer_syntax = self.ts)
35
36        return dce, rpctransport
37
38    def test_FWOpenPolicyStore(self):
39        dce, rpctransport = self.connect()
40        request = fasp.FWOpenPolicyStore()
41        request['BinaryVersion'] = 0x0200
42        request['StoreType'] = fasp.FW_STORE_TYPE.FW_STORE_TYPE_LOCAL
43        request['AccessRight'] = fasp.FW_POLICY_ACCESS_RIGHT.FW_POLICY_ACCESS_RIGHT_READ
44        request['dwFlags'] = 0
45        resp = dce.request(request)
46        resp.dump()
47
48    def test_hFWOpenPolicyStore(self):
49        dce, rpctransport = self.connect()
50        resp = fasp.hFWOpenPolicyStore(dce)
51        resp.dump()
52
53
54    def test_FWClosePolicyStore(self):
55        dce, rpctransport = self.connect()
56        resp = fasp.hFWOpenPolicyStore(dce)
57        request = fasp.FWClosePolicyStore()
58        request['phPolicyStore'] = resp['phPolicyStore']
59        resp = dce.request(request)
60        resp.dump()
61
62    def test_hFWClosePolicyStore(self):
63        dce, rpctransport = self.connect()
64        resp = fasp.hFWOpenPolicyStore(dce)
65        resp = fasp.hFWClosePolicyStore(dce,resp['phPolicyStore'])
66        resp.dump()
67
68class TCPTransport(FASPTests):
69    def setUp(self):
70        FASPTests.setUp(self)
71        configFile = ConfigParser.ConfigParser()
72        configFile.read('dcetests.cfg')
73        self.username = configFile.get('TCPTransport', 'username')
74        self.domain   = configFile.get('TCPTransport', 'domain')
75        self.serverName = configFile.get('TCPTransport', 'servername')
76        self.password = configFile.get('TCPTransport', 'password')
77        self.machine  = configFile.get('TCPTransport', 'machine')
78        self.hashes   = configFile.get('TCPTransport', 'hashes')
79        self.stringBinding = epm.hept_map(self.machine, fasp.MSRPC_UUID_FASP, protocol = 'ncacn_ip_tcp')
80        self.ts = ('8a885d04-1ceb-11c9-9fe8-08002b104860', '2.0')
81
82class TCPTransport64(FASPTests):
83    def setUp(self):
84        FASPTests.setUp(self)
85        configFile = ConfigParser.ConfigParser()
86        configFile.read('dcetests.cfg')
87        self.username = configFile.get('TCPTransport', 'username')
88        self.domain   = configFile.get('TCPTransport', 'domain')
89        self.serverName = configFile.get('TCPTransport', 'servername')
90        self.password = configFile.get('TCPTransport', 'password')
91        self.machine  = configFile.get('TCPTransport', 'machine')
92        self.hashes   = configFile.get('TCPTransport', 'hashes')
93        self.stringBinding = epm.hept_map(self.machine, fasp.MSRPC_UUID_FASP, protocol='ncacn_ip_tcp')
94        self.ts = ('71710533-BEBA-4937-8319-B5DBEF9CCC36', '1.0')
95
96# Process command-line arguments.
97if __name__ == '__main__':
98    import sys
99    if len(sys.argv) > 1:
100        testcase = sys.argv[1]
101        suite = unittest.TestLoader().loadTestsFromTestCase(globals()[testcase])
102    else:
103        suite = unittest.TestLoader().loadTestsFromTestCase(TCPTransport)
104        suite.addTests(unittest.TestLoader().loadTestsFromTestCase(TCPTransport64))
105    unittest.TextTestRunner(verbosity=1).run(suite)
106