1# Unix SMB/CIFS implementation. 2# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2017 3# Copyright (C) Catalyst IT Ltd. 2017 4# 5# This program is free software; you can redistribute it and/or modify 6# it under the terms of the GNU General Public License as published by 7# the Free Software Foundation; either version 3 of the License, or 8# (at your option) any later version. 9# 10# This program is distributed in the hope that it will be useful, 11# but WITHOUT ANY WARRANTY; without even the implied warranty of 12# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 13# GNU General Public License for more details. 14# 15# You should have received a copy of the GNU General Public License 16# along with this program. If not, see <http://www.gnu.org/licenses/>. 17# 18 19""" 20 Tests that exercise the auth logging for a successful netlogon attempt 21 22 NOTE: As the netlogon authentication is performed once per session, 23 there is only one test in this routine. If another test is added 24 only the test executed first will generate the netlogon auth message 25""" 26 27import samba.tests 28import os 29from samba.samdb import SamDB 30import samba.tests.auth_log_base 31from samba.credentials import Credentials 32from samba.dcerpc import netlogon 33from samba.dcerpc.dcerpc import AS_SYSTEM_MAGIC_PATH_TOKEN 34from samba.auth import system_session 35from samba.tests import delete_force 36from samba.dsdb import UF_WORKSTATION_TRUST_ACCOUNT, UF_PASSWD_NOTREQD 37from samba.dcerpc.misc import SEC_CHAN_WKSTA 38from samba.compat import text_type 39from samba.dcerpc.windows_event_ids import ( 40 EVT_ID_SUCCESSFUL_LOGON, 41 EVT_LOGON_NETWORK 42) 43 44 45class AuthLogTestsNetLogon(samba.tests.auth_log_base.AuthLogTestBase): 46 47 def setUp(self): 48 super(AuthLogTestsNetLogon, self).setUp() 49 self.lp = samba.tests.env_loadparm() 50 self.session = system_session() 51 self.ldb = SamDB( 52 session_info=self.session, 53 lp=self.lp) 54 55 self.domain = os.environ["DOMAIN"] 56 self.netbios_name = "NetLogonGood" 57 self.machinepass = "abcdefghij" 58 self.remoteAddress = AS_SYSTEM_MAGIC_PATH_TOKEN 59 self.base_dn = self.ldb.domain_dn() 60 self.dn = ("cn=%s,cn=users,%s" % (self.netbios_name, self.base_dn)) 61 62 utf16pw = text_type('"' + self.machinepass + '"').encode('utf-16-le') 63 self.ldb.add({ 64 "dn": self.dn, 65 "objectclass": "computer", 66 "sAMAccountName": "%s$" % self.netbios_name, 67 "userAccountControl": 68 str(UF_WORKSTATION_TRUST_ACCOUNT | UF_PASSWD_NOTREQD), 69 "unicodePwd": utf16pw}) 70 71 def tearDown(self): 72 super(AuthLogTestsNetLogon, self).tearDown() 73 delete_force(self.ldb, self.dn) 74 75 def _test_netlogon(self, binding, checkFunction): 76 77 def isLastExpectedMessage(msg): 78 return ( 79 msg["type"] == "Authorization" and 80 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and 81 msg["Authorization"]["authType"] == "schannel" and 82 msg["Authorization"]["transportProtection"] == "SEAL") 83 84 if binding: 85 binding = "[schannel,%s]" % binding 86 else: 87 binding = "[schannel]" 88 89 machine_creds = Credentials() 90 machine_creds.guess(self.get_loadparm()) 91 machine_creds.set_secure_channel_type(SEC_CHAN_WKSTA) 92 machine_creds.set_password(self.machinepass) 93 machine_creds.set_username(self.netbios_name + "$") 94 95 netlogon_conn = netlogon.netlogon("ncalrpc:%s" % binding, 96 self.get_loadparm(), 97 machine_creds) 98 99 messages = self.waitForMessages(isLastExpectedMessage, netlogon_conn) 100 checkFunction(messages) 101 102 def netlogon_check(self, messages): 103 104 expected_messages = 5 105 self.assertEquals(expected_messages, 106 len(messages), 107 "Did not receive the expected number of messages") 108 109 # Check the first message it should be an Authorization 110 msg = messages[0] 111 self.assertEquals("Authorization", msg["type"]) 112 self.assertEquals("DCE/RPC", 113 msg["Authorization"]["serviceDescription"]) 114 self.assertEquals("ncalrpc", msg["Authorization"]["authType"]) 115 self.assertEquals("NONE", msg["Authorization"]["transportProtection"]) 116 self.assertTrue(self.is_guid(msg["Authorization"]["sessionId"])) 117 118 # Check the fourth message it should be a NETLOGON Authentication 119 msg = messages[3] 120 self.assertEquals("Authentication", msg["type"]) 121 self.assertEquals("NETLOGON", 122 msg["Authentication"]["serviceDescription"]) 123 self.assertEquals("ServerAuthenticate", 124 msg["Authentication"]["authDescription"]) 125 self.assertEquals("NT_STATUS_OK", 126 msg["Authentication"]["status"]) 127 self.assertEquals("HMAC-SHA256", 128 msg["Authentication"]["passwordType"]) 129 self.assertEquals(EVT_ID_SUCCESSFUL_LOGON, 130 msg["Authentication"]["eventId"]) 131 self.assertEquals(EVT_LOGON_NETWORK, 132 msg["Authentication"]["logonType"]) 133 134 def test_netlogon(self): 135 self._test_netlogon("SEAL", self.netlogon_check) 136