1# Unix SMB/CIFS implementation.
2# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017
3# Copyright (C) Catalyst IT Ltd. 2017
4#
5# This program is free software; you can redistribute it and/or modify
6# it under the terms of the GNU General Public License as published by
7# the Free Software Foundation; either version 3 of the License, or
8# (at your option) any later version.
9#
10# This program is distributed in the hope that it will be useful,
11# but WITHOUT ANY WARRANTY; without even the implied warranty of
12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13# GNU General Public License for more details.
14#
15# You should have received a copy of the GNU General Public License
16# along with this program.  If not, see <http://www.gnu.org/licenses/>.
17#
18
19"""
20    Tests that exercise the auth logging for a successful netlogon attempt
21
22    NOTE: As the netlogon authentication is performed once per session,
23          there is only one test in this routine.  If another test is added
24          only the test executed first will generate the netlogon auth message
25"""
26
27import samba.tests
28import os
29from samba.samdb import SamDB
30import samba.tests.auth_log_base
31from samba.credentials import Credentials
32from samba.dcerpc import netlogon
33from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN
34from samba.auth import system_session
35from samba.tests import delete_force
36from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD
37from samba.dcerpc.misc import SEC_CHAN_WKSTA
38from samba.compat import text_type
39from samba.dcerpc.windows_event_ids import (
40    EVT_ID_SUCCESSFUL_LOGON,
41    EVT_LOGON_NETWORK
42)
43
44
45class AuthLogTestsNetLogon(samba.tests.auth_log_base.AuthLogTestBase):
46
47    def setUp(self):
48        super(AuthLogTestsNetLogon, self).setUp()
49        self.lp = samba.tests.env_loadparm()
50        self.session = system_session()
51        self.ldb = SamDB(
52            session_info=self.session,
53            lp=self.lp)
54
55        self.domain = os.environ["DOMAIN"]
56        self.netbios_name = "NetLogonGood"
57        self.machinepass = "abcdefghij"
58        self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN
59        self.base_dn = self.ldb.domain_dn()
60        self.dn = ("cn=%s,cn=users,%s" % (self.netbios_name, self.base_dn))
61
62        utf16pw = text_type('"' + self.machinepass + '"').encode('utf-16-le')
63        self.ldb.add({
64            "dn": self.dn,
65            "objectclass": "computer",
66            "sAMAccountName": "%s$" % self.netbios_name,
67            "userAccountControl":
68                str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD),
69            "unicodePwd": utf16pw})
70
71    def tearDown(self):
72        super(AuthLogTestsNetLogon, self).tearDown()
73        delete_force(self.ldb, self.dn)
74
75    def _test_netlogon(self, binding, checkFunction):
76
77        def isLastExpectedMessage(msg):
78            return (
79                msg["type"] == "Authorization" and
80                msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
81                msg["Authorization"]["authType"] == "schannel" and
82                msg["Authorization"]["transportProtection"] == "SEAL")
83
84        if binding:
85            binding = "[schannel,%s]" % binding
86        else:
87            binding = "[schannel]"
88
89        machine_creds = Credentials()
90        machine_creds.guess(self.get_loadparm())
91        machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA)
92        machine_creds.set_password(self.machinepass)
93        machine_creds.set_username(self.netbios_name + "$")
94
95        netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding,
96                                          self.get_loadparm(),
97                                          machine_creds)
98
99        messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn)
100        checkFunction(messages)
101
102    def netlogon_check(self, messages):
103
104        expected_messages = 5
105        self.assertEquals(expected_messages,
106                          len(messages),
107                          "Did not receive the expected number of messages")
108
109        # Check the first message it should be an Authorization
110        msg = messages[0]
111        self.assertEquals("Authorization", msg["type"])
112        self.assertEquals("DCE/RPC",
113                          msg["Authorization"]["serviceDescription"])
114        self.assertEquals("ncalrpc", msg["Authorization"]["authType"])
115        self.assertEquals("NONE", msg["Authorization"]["transportProtection"])
116        self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"]))
117
118        # Check the fourth message it should be a NETLOGON Authentication
119        msg = messages[3]
120        self.assertEquals("Authentication", msg["type"])
121        self.assertEquals("NETLOGON",
122                          msg["Authentication"]["serviceDescription"])
123        self.assertEquals("ServerAuthenticate",
124                          msg["Authentication"]["authDescription"])
125        self.assertEquals("NT_STATUS_OK",
126                          msg["Authentication"]["status"])
127        self.assertEquals("HMAC-SHA256",
128                          msg["Authentication"]["passwordType"])
129        self.assertEquals(EVT_ID_SUCCESSFUL_LOGON,
130                          msg["Authentication"]["eventId"])
131        self.assertEquals(EVT_LOGON_NETWORK,
132                          msg["Authentication"]["logonType"])
133
134    def test_netlogon(self):
135        self._test_netlogon("SEAL", self.netlogon_check)
136