1# Unix SMB/CIFS implementation. 2# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019 3# 4# This program is free software; you can redistribute it and/or modify 5# it under the terms of the GNU General Public License as published by 6# the Free Software Foundation; either version 3 of the License, or 7# (at your option) any later version. 8# 9# This program is distributed in the hope that it will be useful, 10# but WITHOUT ANY WARRANTY; without even the implied warranty of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 12# GNU General Public License for more details. 13# 14# You should have received a copy of the GNU General Public License 15# along with this program. If not, see <http://www.gnu.org/licenses/>. 16# 17 18""" 19 auth logging tests that exercise winbind 20""" 21 22import json 23import os 24import time 25 26from samba.auth import system_session 27from samba.credentials import Credentials 28from samba.compat import get_string, get_bytes 29from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG 30from samba.dsdb import UF_NORMAL_ACCOUNT 31from samba.messaging import Messaging 32from samba.param import LoadParm 33from samba.samdb import SamDB 34from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase 35from samba.tests.auth_log_base import AuthLogTestBase 36 37USER_NAME = "WBALU" 38 39 40class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase): 41 42 # 43 # Helper function to watch for authentication messages on the 44 # Domain Controller. 45 # 46 def dc_watcher(self): 47 48 (r1, w1) = os.pipe() 49 pid = os.fork() 50 if pid != 0: 51 # Parent process return the result socket to the caller. 52 return r1 53 54 # Load the lp context for the Domain Controller, rather than the 55 # member server. 56 config_file = os.environ["DC_SERVERCONFFILE"] 57 lp_ctx = LoadParm() 58 lp_ctx.load(config_file) 59 60 # 61 # Is the message a SamLogon authentication? 62 def is_sam_logon(m): 63 if m is None: 64 return False 65 msg = json.loads(m) 66 return ( 67 msg["type"] == "Authentication" and 68 msg["Authentication"]["serviceDescription"] == "SamLogon") 69 70 # 71 # Handler function for received authentication messages. 72 def message_handler(context, msgType, src, message): 73 # Print the message to help debugging the tests. 74 # as it's a JSON message it does not look like a sub-unit message. 75 print(message) 76 self.dc_msgs.append(message) 77 78 # Set up a messaging context to listen for authentication events on 79 # the domain controller. 80 msg_ctx = Messaging((1,), lp_ctx=lp_ctx) 81 msg_ctx.irpc_add_name(AUTH_EVENT_NAME) 82 msg_handler_and_context = (message_handler, None) 83 msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG) 84 85 # Wait for the SamLogon message. 86 # As there could be other SamLogon's in progress we need to collect 87 # all the SamLogons and let the caller match them to the session. 88 self.dc_msgs = [] 89 start_time = time.time() 90 while (time.time() - start_time < 1): 91 msg_ctx.loop_once(0.1) 92 93 # Only interested in SamLogon messages, filter out the rest 94 msgs = list(filter(is_sam_logon, self.dc_msgs)) 95 if msgs: 96 for m in msgs: 97 os.write(w1, get_bytes(m+"\n")) 98 else: 99 os.write(w1, get_bytes("None\n")) 100 os.close(w1) 101 102 msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG) 103 msg_ctx.irpc_remove_name(AUTH_EVENT_NAME) 104 105 os._exit(0) 106 107 # Remove any DCE/RPC ncacn_np messages 108 # these only get triggered once per session, and stripping them out 109 # avoids ordering dependencies in the tests 110 # 111 def filter_messages(self, messages): 112 def keep(msg): 113 if (msg["type"] == "Authorization" and 114 msg["Authorization"]["serviceDescription"] == "DCE/RPC" and 115 msg["Authorization"]["authType"] == "ncacn_np"): 116 return False 117 else: 118 return True 119 120 return list(filter(keep, messages)) 121 122 def setUp(self): 123 super(AuthLogTestsWinbind, self).setUp() 124 self.domain = os.environ["DOMAIN"] 125 self.host = os.environ["SERVER"] 126 self.dc = os.environ["DC_SERVER"] 127 self.lp = self.get_loadparm() 128 self.credentials = self.get_credentials() 129 self.session = system_session() 130 131 self.ldb = SamDB( 132 url="ldap://{0}".format(self.dc), 133 session_info=self.session, 134 credentials=self.credentials, 135 lp=self.lp) 136 self.create_user_account() 137 138 def tearDown(self): 139 super(AuthLogTestsWinbind, self).tearDown() 140 delete_force(self.ldb, self.user_dn) 141 142 # 143 # Create a test user account 144 def create_user_account(self): 145 self.user_pass = self.random_password() 146 self.user_name = USER_NAME 147 self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn()) 148 149 # remove the account if it exists, this will happen if a previous test 150 # run failed 151 delete_force(self.ldb, self.user_dn) 152 153 utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le') 154 self.ldb.add({ 155 "dn": self.user_dn, 156 "objectclass": "user", 157 "sAMAccountName": "%s" % self.user_name, 158 "userAccountControl": str(UF_NORMAL_ACCOUNT), 159 "unicodePwd": utf16pw}) 160 161 self.user_creds = Credentials() 162 self.user_creds.guess(self.get_loadparm()) 163 self.user_creds.set_password(self.user_pass) 164 self.user_creds.set_username(self.user_name) 165 self.user_creds.set_workstation(self.server) 166 167 # 168 # Check that the domain server received a SamLogon request for the 169 # current logon. 170 # 171 def check_domain_server_authentication(self, pipe, logon_id, description): 172 173 messages = os.read(pipe, 8192) 174 messages = get_string(messages) 175 if len(messages) == 0 or messages == "None": 176 self.fail("No Domain server authentication message") 177 178 # 179 # Look for the SamLogon request matching logon_id 180 msg = None 181 for message in messages.split("\n"): 182 msg = json.loads(get_string(message)) 183 if logon_id == msg["Authentication"]["logonId"]: 184 break 185 msg = None 186 187 if msg is None: 188 self.fail("No Domain server authentication message") 189 190 # 191 # Validate that message contains the expected data 192 # 193 self.assertEquals("Authentication", msg["type"]) 194 self.assertEquals(logon_id, msg["Authentication"]["logonId"]) 195 self.assertEquals("SamLogon", 196 msg["Authentication"]["serviceDescription"]) 197 self.assertEquals(description, 198 msg["Authentication"]["authDescription"]) 199 200 def test_ntlm_auth(self): 201 202 def isLastExpectedMessage(msg): 203 DESC = "PAM_AUTH, ntlm_auth" 204 return ( 205 msg["type"] == "Authentication" and 206 msg["Authentication"]["serviceDescription"] == "winbind" and 207 msg["Authentication"]["authDescription"] is not None and 208 msg["Authentication"]["authDescription"].startswith(DESC)) 209 210 pipe = self.dc_watcher() 211 COMMAND = "bin/ntlm_auth" 212 self.check_run("{0} --username={1} --password={2}".format( 213 COMMAND, 214 self.credentials.get_username(), 215 self.credentials.get_password()), 216 msg="ntlm_auth failed") 217 218 messages = self.waitForMessages(isLastExpectedMessage) 219 messages = self.filter_messages(messages) 220 expected_messages = 1 221 self.assertEquals(expected_messages, 222 len(messages), 223 "Did not receive the expected number of messages") 224 225 # Check the first message it should be an Authentication 226 msg = messages[0] 227 self.assertEquals("Authentication", msg["type"]) 228 self.assertTrue( 229 msg["Authentication"]["authDescription"].startswith( 230 "PAM_AUTH, ntlm_auth,")) 231 self.assertEquals("winbind", 232 msg["Authentication"]["serviceDescription"]) 233 self.assertEquals("Plaintext", msg["Authentication"]["passwordType"]) 234 # Logon type should be NetworkCleartext 235 self.assertEquals(8, msg["Authentication"]["logonType"]) 236 # Event code should be Successful logon 237 self.assertEquals(4624, msg["Authentication"]["eventId"]) 238 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 239 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 240 self.assertEquals(self.domain, msg["Authentication"]["clientDomain"]) 241 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) 242 self.assertEquals(self.credentials.get_username(), 243 msg["Authentication"]["clientAccount"]) 244 self.assertEquals(self.credentials.get_domain(), 245 msg["Authentication"]["clientDomain"]) 246 self.assertTrue(msg["Authentication"]["workstation"] is None) 247 248 logon_id = msg["Authentication"]["logonId"] 249 250 # 251 # Now check the Domain server authentication message 252 # 253 self.check_domain_server_authentication(pipe, logon_id, "interactive") 254 255 def test_wbinfo(self): 256 def isLastExpectedMessage(msg): 257 DESC = "NTLM_AUTH, wbinfo" 258 return ( 259 msg["type"] == "Authentication" and 260 msg["Authentication"]["serviceDescription"] == "winbind" and 261 msg["Authentication"]["authDescription"] is not None and 262 msg["Authentication"]["authDescription"].startswith(DESC)) 263 264 pipe = self.dc_watcher() 265 COMMAND = "bin/wbinfo" 266 try: 267 self.check_run("{0} -a {1}%{2}".format( 268 COMMAND, 269 self.credentials.get_username(), 270 self.credentials.get_password()), 271 msg="ntlm_auth failed") 272 except BlackboxProcessError: 273 pass 274 275 messages = self.waitForMessages(isLastExpectedMessage) 276 messages = self.filter_messages(messages) 277 expected_messages = 3 278 self.assertEquals(expected_messages, 279 len(messages), 280 "Did not receive the expected number of messages") 281 282 # The 1st message should be an Authentication against the local 283 # password database 284 msg = messages[0] 285 self.assertEquals("Authentication", msg["type"]) 286 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 287 "PASSDB, wbinfo,")) 288 self.assertEquals("winbind", 289 msg["Authentication"]["serviceDescription"]) 290 # Logon type should be Interactive 291 self.assertEquals(2, msg["Authentication"]["logonType"]) 292 # Event code should be Unsuccessful logon 293 self.assertEquals(4625, msg["Authentication"]["eventId"]) 294 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 295 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 296 self.assertEquals('', msg["Authentication"]["clientDomain"]) 297 # This is what the existing winbind implementation returns. 298 self.assertEquals("NT_STATUS_NO_SUCH_USER", 299 msg["Authentication"]["status"]) 300 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"]) 301 self.assertEquals(self.credentials.get_username(), 302 msg["Authentication"]["clientAccount"]) 303 self.assertEquals("", msg["Authentication"]["clientDomain"]) 304 305 logon_id = msg["Authentication"]["logonId"] 306 307 # The 2nd message should be a PAM_AUTH with the same logon id as the 308 # 1st message 309 msg = messages[1] 310 self.assertEquals("Authentication", msg["type"]) 311 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 312 "PAM_AUTH")) 313 self.assertEquals("winbind", 314 msg["Authentication"]["serviceDescription"]) 315 self.assertEquals(logon_id, msg["Authentication"]["logonId"]) 316 # Logon type should be NetworkCleartext 317 self.assertEquals(8, msg["Authentication"]["logonType"]) 318 # Event code should be Unsuccessful logon 319 self.assertEquals(4625, msg["Authentication"]["eventId"]) 320 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 321 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 322 self.assertEquals('', msg["Authentication"]["clientDomain"]) 323 # This is what the existing winbind implementation returns. 324 self.assertEquals("NT_STATUS_NO_SUCH_USER", 325 msg["Authentication"]["status"]) 326 self.assertEquals(self.credentials.get_username(), 327 msg["Authentication"]["clientAccount"]) 328 self.assertEquals("", msg["Authentication"]["clientDomain"]) 329 330 # The 3rd message should be an NTLM_AUTH 331 msg = messages[2] 332 self.assertEquals("Authentication", msg["type"]) 333 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 334 "NTLM_AUTH, wbinfo,")) 335 self.assertEquals("winbind", 336 msg["Authentication"]["serviceDescription"]) 337 # Logon type should be Network 338 self.assertEquals(3, msg["Authentication"]["logonType"]) 339 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) 340 # Event code should be successful logon 341 self.assertEquals(4624, msg["Authentication"]["eventId"]) 342 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"]) 343 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 344 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 345 self.assertEquals(self.credentials.get_username(), 346 msg["Authentication"]["clientAccount"]) 347 self.assertEquals(self.credentials.get_domain(), 348 msg["Authentication"]["clientDomain"]) 349 350 logon_id = msg["Authentication"]["logonId"] 351 352 # 353 # Now check the Domain server authentication message 354 # 355 self.check_domain_server_authentication(pipe, logon_id, "network") 356 357 def test_wbinfo_ntlmv1(self): 358 def isLastExpectedMessage(msg): 359 DESC = "NTLM_AUTH, wbinfo" 360 return ( 361 msg["type"] == "Authentication" and 362 msg["Authentication"]["serviceDescription"] == "winbind" and 363 msg["Authentication"]["authDescription"] is not None and 364 msg["Authentication"]["authDescription"].startswith(DESC)) 365 366 pipe = self.dc_watcher() 367 COMMAND = "bin/wbinfo" 368 try: 369 self.check_run("{0} --ntlmv1 -a {1}%{2}".format( 370 COMMAND, 371 self.credentials.get_username(), 372 self.credentials.get_password()), 373 msg="ntlm_auth failed") 374 except BlackboxProcessError: 375 pass 376 377 messages = self.waitForMessages(isLastExpectedMessage) 378 messages = self.filter_messages(messages) 379 expected_messages = 3 380 self.assertEquals(expected_messages, 381 len(messages), 382 "Did not receive the expected number of messages") 383 384 # The 1st message should be an Authentication against the local 385 # password database 386 msg = messages[0] 387 self.assertEquals("Authentication", msg["type"]) 388 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 389 "PASSDB, wbinfo,")) 390 self.assertEquals("winbind", 391 msg["Authentication"]["serviceDescription"]) 392 # Logon type should be Interactive 393 self.assertEquals(2, msg["Authentication"]["logonType"]) 394 # Event code should be Unsuccessful logon 395 self.assertEquals(4625, msg["Authentication"]["eventId"]) 396 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 397 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 398 self.assertEquals('', msg["Authentication"]["clientDomain"]) 399 # This is what the existing winbind implementation returns. 400 self.assertEquals("NT_STATUS_NO_SUCH_USER", 401 msg["Authentication"]["status"]) 402 self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"]) 403 self.assertEquals(self.credentials.get_username(), 404 msg["Authentication"]["clientAccount"]) 405 self.assertEquals("", msg["Authentication"]["clientDomain"]) 406 407 logon_id = msg["Authentication"]["logonId"] 408 409 # The 2nd message should be a PAM_AUTH with the same logon id as the 410 # 1st message 411 msg = messages[1] 412 self.assertEquals("Authentication", msg["type"]) 413 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 414 "PAM_AUTH")) 415 self.assertEquals("winbind", 416 msg["Authentication"]["serviceDescription"]) 417 self.assertEquals(logon_id, msg["Authentication"]["logonId"]) 418 self.assertEquals("Plaintext", msg["Authentication"]["passwordType"]) 419 # Logon type should be NetworkCleartext 420 self.assertEquals(8, msg["Authentication"]["logonType"]) 421 # Event code should be Unsuccessful logon 422 self.assertEquals(4625, msg["Authentication"]["eventId"]) 423 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 424 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 425 self.assertEquals('', msg["Authentication"]["clientDomain"]) 426 # This is what the existing winbind implementation returns. 427 self.assertEquals("NT_STATUS_NO_SUCH_USER", 428 msg["Authentication"]["status"]) 429 self.assertEquals(self.credentials.get_username(), 430 msg["Authentication"]["clientAccount"]) 431 self.assertEquals("", msg["Authentication"]["clientDomain"]) 432 433 # The 3rd message should be an NTLM_AUTH 434 msg = messages[2] 435 self.assertEquals("Authentication", msg["type"]) 436 self.assertTrue(msg["Authentication"]["authDescription"].startswith( 437 "NTLM_AUTH, wbinfo,")) 438 self.assertEquals("winbind", 439 msg["Authentication"]["serviceDescription"]) 440 self.assertEquals("NTLMv1", 441 msg["Authentication"]["passwordType"]) 442 # Logon type should be Network 443 self.assertEquals(3, msg["Authentication"]["logonType"]) 444 self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"]) 445 # Event code should be successful logon 446 self.assertEquals(4624, msg["Authentication"]["eventId"]) 447 self.assertEquals("unix:", msg["Authentication"]["remoteAddress"]) 448 self.assertEquals("unix:", msg["Authentication"]["localAddress"]) 449 self.assertEquals(self.credentials.get_username(), 450 msg["Authentication"]["clientAccount"]) 451 self.assertEquals(self.credentials.get_domain(), 452 msg["Authentication"]["clientDomain"]) 453 454 logon_id = msg["Authentication"]["logonId"] 455 # 456 # Now check the Domain server authentication message 457 # 458 self.check_domain_server_authentication(pipe, logon_id, "network") 459