1# Unix SMB/CIFS implementation.
2# Copyright (C) Andrew Bartlett <abartlet@samba.org> 2019
3#
4# This program is free software; you can redistribute it and/or modify
5# it under the terms of the GNU General Public License as published by
6# the Free Software Foundation; either version 3 of the License, or
7# (at your option) any later version.
8#
9# This program is distributed in the hope that it will be useful,
10# but WITHOUT ANY WARRANTY; without even the implied warranty of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12# GNU General Public License for more details.
13#
14# You should have received a copy of the GNU General Public License
15# along with this program.  If not, see <http://www.gnu.org/licenses/>.
16#
17
18"""
19    auth logging tests that exercise winbind
20"""
21
22import json
23import os
24import time
25
26from samba.auth import system_session
27from samba.credentials import Credentials
28from samba.compat import get_string, get_bytes
29from samba.dcerpc.messaging import AUTH_EVENT_NAME, MSG_AUTH_LOG
30from samba.dsdb import UF_NORMAL_ACCOUNT
31from samba.messaging import Messaging
32from samba.param import LoadParm
33from samba.samdb import SamDB
34from samba.tests import delete_force, BlackboxProcessError, BlackboxTestCase
35from samba.tests.auth_log_base import AuthLogTestBase
36
37USER_NAME = "WBALU"
38
39
40class AuthLogTestsWinbind(AuthLogTestBase, BlackboxTestCase):
41
42    #
43    # Helper function to watch for authentication messages on the
44    # Domain Controller.
45    #
46    def dc_watcher(self):
47
48        (r1, w1) = os.pipe()
49        pid = os.fork()
50        if pid != 0:
51            # Parent process return the result socket to the caller.
52            return r1
53
54        # Load the lp context for the Domain Controller, rather than the
55        # member server.
56        config_file = os.environ["DC_SERVERCONFFILE"]
57        lp_ctx = LoadParm()
58        lp_ctx.load(config_file)
59
60        #
61        # Is the message a SamLogon authentication?
62        def is_sam_logon(m):
63            if m is None:
64                return False
65            msg = json.loads(m)
66            return (
67                msg["type"] == "Authentication" and
68                msg["Authentication"]["serviceDescription"] == "SamLogon")
69
70        #
71        # Handler function for received authentication messages.
72        def message_handler(context, msgType, src, message):
73            # Print the message to help debugging the tests.
74            # as it's a JSON message it does not look like a sub-unit message.
75            print(message)
76            self.dc_msgs.append(message)
77
78        # Set up a messaging context to listen for authentication events on
79        # the domain controller.
80        msg_ctx = Messaging((1,), lp_ctx=lp_ctx)
81        msg_ctx.irpc_add_name(AUTH_EVENT_NAME)
82        msg_handler_and_context = (message_handler, None)
83        msg_ctx.register(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
84
85        # Wait for the SamLogon message.
86        # As there could be other SamLogon's in progress we need to collect
87        # all the SamLogons and let the caller match them to the session.
88        self.dc_msgs = []
89        start_time = time.time()
90        while (time.time() - start_time < 1):
91            msg_ctx.loop_once(0.1)
92
93        # Only interested in SamLogon messages, filter out the rest
94        msgs = list(filter(is_sam_logon, self.dc_msgs))
95        if msgs:
96            for m in msgs:
97                os.write(w1, get_bytes(m+"\n"))
98        else:
99            os.write(w1, get_bytes("None\n"))
100        os.close(w1)
101
102        msg_ctx.deregister(msg_handler_and_context, msg_type=MSG_AUTH_LOG)
103        msg_ctx.irpc_remove_name(AUTH_EVENT_NAME)
104
105        os._exit(0)
106
107    # Remove any DCE/RPC ncacn_np messages
108    # these only get triggered once per session, and stripping them out
109    # avoids ordering dependencies in the tests
110    #
111    def filter_messages(self, messages):
112        def keep(msg):
113            if (msg["type"] == "Authorization" and
114                msg["Authorization"]["serviceDescription"] == "DCE/RPC" and
115                msg["Authorization"]["authType"] == "ncacn_np"):
116                    return False
117            else:
118                return True
119
120        return list(filter(keep, messages))
121
122    def setUp(self):
123        super(AuthLogTestsWinbind, self).setUp()
124        self.domain = os.environ["DOMAIN"]
125        self.host = os.environ["SERVER"]
126        self.dc = os.environ["DC_SERVER"]
127        self.lp = self.get_loadparm()
128        self.credentials = self.get_credentials()
129        self.session = system_session()
130
131        self.ldb = SamDB(
132            url="ldap://{0}".format(self.dc),
133            session_info=self.session,
134            credentials=self.credentials,
135            lp=self.lp)
136        self.create_user_account()
137
138    def tearDown(self):
139        super(AuthLogTestsWinbind, self).tearDown()
140        delete_force(self.ldb, self.user_dn)
141
142    #
143    # Create a test user account
144    def create_user_account(self):
145        self.user_pass = self.random_password()
146        self.user_name = USER_NAME
147        self.user_dn = "cn=%s,%s" % (self.user_name, self.ldb.domain_dn())
148
149        # remove the account if it exists, this will happen if a previous test
150        # run failed
151        delete_force(self.ldb, self.user_dn)
152
153        utf16pw = ('"%s"' % get_string(self.user_pass)).encode('utf-16-le')
154        self.ldb.add({
155           "dn": self.user_dn,
156           "objectclass": "user",
157           "sAMAccountName": "%s" % self.user_name,
158           "userAccountControl": str(UF_NORMAL_ACCOUNT),
159           "unicodePwd": utf16pw})
160
161        self.user_creds = Credentials()
162        self.user_creds.guess(self.get_loadparm())
163        self.user_creds.set_password(self.user_pass)
164        self.user_creds.set_username(self.user_name)
165        self.user_creds.set_workstation(self.server)
166
167    #
168    # Check that the domain server received a SamLogon request for the
169    # current logon.
170    #
171    def check_domain_server_authentication(self, pipe, logon_id, description):
172
173        messages = os.read(pipe, 8192)
174        messages = get_string(messages)
175        if len(messages) == 0 or messages == "None":
176            self.fail("No Domain server authentication message")
177
178        #
179        # Look for the SamLogon request matching logon_id
180        msg = None
181        for message in messages.split("\n"):
182            msg = json.loads(get_string(message))
183            if logon_id == msg["Authentication"]["logonId"]:
184                break
185            msg = None
186
187        if msg is None:
188            self.fail("No Domain server authentication message")
189
190        #
191        # Validate that message contains the expected data
192        #
193        self.assertEquals("Authentication", msg["type"])
194        self.assertEquals(logon_id, msg["Authentication"]["logonId"])
195        self.assertEquals("SamLogon",
196                          msg["Authentication"]["serviceDescription"])
197        self.assertEquals(description,
198                          msg["Authentication"]["authDescription"])
199
200    def test_ntlm_auth(self):
201
202        def isLastExpectedMessage(msg):
203            DESC = "PAM_AUTH, ntlm_auth"
204            return (
205                msg["type"] == "Authentication" and
206                msg["Authentication"]["serviceDescription"] == "winbind" and
207                msg["Authentication"]["authDescription"] is not None and
208                msg["Authentication"]["authDescription"].startswith(DESC))
209
210        pipe = self.dc_watcher()
211        COMMAND = "bin/ntlm_auth"
212        self.check_run("{0} --username={1} --password={2}".format(
213            COMMAND,
214            self.credentials.get_username(),
215            self.credentials.get_password()),
216            msg="ntlm_auth failed")
217
218        messages = self.waitForMessages(isLastExpectedMessage)
219        messages = self.filter_messages(messages)
220        expected_messages = 1
221        self.assertEquals(expected_messages,
222                          len(messages),
223                          "Did not receive the expected number of messages")
224
225        # Check the first message it should be an Authentication
226        msg = messages[0]
227        self.assertEquals("Authentication", msg["type"])
228        self.assertTrue(
229            msg["Authentication"]["authDescription"].startswith(
230                "PAM_AUTH, ntlm_auth,"))
231        self.assertEquals("winbind",
232                          msg["Authentication"]["serviceDescription"])
233        self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
234        # Logon type should be NetworkCleartext
235        self.assertEquals(8, msg["Authentication"]["logonType"])
236        # Event code should be Successful logon
237        self.assertEquals(4624, msg["Authentication"]["eventId"])
238        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
239        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
240        self.assertEquals(self.domain, msg["Authentication"]["clientDomain"])
241        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
242        self.assertEquals(self.credentials.get_username(),
243                          msg["Authentication"]["clientAccount"])
244        self.assertEquals(self.credentials.get_domain(),
245                          msg["Authentication"]["clientDomain"])
246        self.assertTrue(msg["Authentication"]["workstation"] is None)
247
248        logon_id = msg["Authentication"]["logonId"]
249
250        #
251        # Now check the Domain server authentication message
252        #
253        self.check_domain_server_authentication(pipe, logon_id, "interactive")
254
255    def test_wbinfo(self):
256        def isLastExpectedMessage(msg):
257            DESC = "NTLM_AUTH, wbinfo"
258            return (
259                msg["type"] == "Authentication" and
260                msg["Authentication"]["serviceDescription"] == "winbind" and
261                msg["Authentication"]["authDescription"] is not None and
262                msg["Authentication"]["authDescription"].startswith(DESC))
263
264        pipe = self.dc_watcher()
265        COMMAND = "bin/wbinfo"
266        try:
267            self.check_run("{0} -a {1}%{2}".format(
268                COMMAND,
269                self.credentials.get_username(),
270                self.credentials.get_password()),
271                msg="ntlm_auth failed")
272        except BlackboxProcessError:
273            pass
274
275        messages = self.waitForMessages(isLastExpectedMessage)
276        messages = self.filter_messages(messages)
277        expected_messages = 3
278        self.assertEquals(expected_messages,
279                          len(messages),
280                          "Did not receive the expected number of messages")
281
282        # The 1st message should be an Authentication against the local
283        # password database
284        msg = messages[0]
285        self.assertEquals("Authentication", msg["type"])
286        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
287            "PASSDB, wbinfo,"))
288        self.assertEquals("winbind",
289                          msg["Authentication"]["serviceDescription"])
290        # Logon type should be Interactive
291        self.assertEquals(2, msg["Authentication"]["logonType"])
292        # Event code should be Unsuccessful logon
293        self.assertEquals(4625, msg["Authentication"]["eventId"])
294        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
295        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
296        self.assertEquals('', msg["Authentication"]["clientDomain"])
297        # This is what the existing winbind implementation returns.
298        self.assertEquals("NT_STATUS_NO_SUCH_USER",
299                          msg["Authentication"]["status"])
300        self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
301        self.assertEquals(self.credentials.get_username(),
302                          msg["Authentication"]["clientAccount"])
303        self.assertEquals("", msg["Authentication"]["clientDomain"])
304
305        logon_id = msg["Authentication"]["logonId"]
306
307        # The 2nd message should be a PAM_AUTH with the same logon id as the
308        # 1st message
309        msg = messages[1]
310        self.assertEquals("Authentication", msg["type"])
311        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
312            "PAM_AUTH"))
313        self.assertEquals("winbind",
314                          msg["Authentication"]["serviceDescription"])
315        self.assertEquals(logon_id, msg["Authentication"]["logonId"])
316        # Logon type should be NetworkCleartext
317        self.assertEquals(8, msg["Authentication"]["logonType"])
318        # Event code should be Unsuccessful logon
319        self.assertEquals(4625, msg["Authentication"]["eventId"])
320        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
321        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
322        self.assertEquals('', msg["Authentication"]["clientDomain"])
323        # This is what the existing winbind implementation returns.
324        self.assertEquals("NT_STATUS_NO_SUCH_USER",
325                          msg["Authentication"]["status"])
326        self.assertEquals(self.credentials.get_username(),
327                          msg["Authentication"]["clientAccount"])
328        self.assertEquals("", msg["Authentication"]["clientDomain"])
329
330        # The 3rd message should be an NTLM_AUTH
331        msg = messages[2]
332        self.assertEquals("Authentication", msg["type"])
333        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
334            "NTLM_AUTH, wbinfo,"))
335        self.assertEquals("winbind",
336                          msg["Authentication"]["serviceDescription"])
337        # Logon type should be Network
338        self.assertEquals(3, msg["Authentication"]["logonType"])
339        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
340        # Event code should be successful logon
341        self.assertEquals(4624, msg["Authentication"]["eventId"])
342        self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
343        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
344        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
345        self.assertEquals(self.credentials.get_username(),
346                          msg["Authentication"]["clientAccount"])
347        self.assertEquals(self.credentials.get_domain(),
348                          msg["Authentication"]["clientDomain"])
349
350        logon_id = msg["Authentication"]["logonId"]
351
352        #
353        # Now check the Domain server authentication message
354        #
355        self.check_domain_server_authentication(pipe, logon_id, "network")
356
357    def test_wbinfo_ntlmv1(self):
358        def isLastExpectedMessage(msg):
359            DESC = "NTLM_AUTH, wbinfo"
360            return (
361                msg["type"] == "Authentication" and
362                msg["Authentication"]["serviceDescription"] == "winbind" and
363                msg["Authentication"]["authDescription"] is not None and
364                msg["Authentication"]["authDescription"].startswith(DESC))
365
366        pipe = self.dc_watcher()
367        COMMAND = "bin/wbinfo"
368        try:
369            self.check_run("{0} --ntlmv1 -a {1}%{2}".format(
370                COMMAND,
371                self.credentials.get_username(),
372                self.credentials.get_password()),
373                msg="ntlm_auth failed")
374        except BlackboxProcessError:
375            pass
376
377        messages = self.waitForMessages(isLastExpectedMessage)
378        messages = self.filter_messages(messages)
379        expected_messages = 3
380        self.assertEquals(expected_messages,
381                          len(messages),
382                          "Did not receive the expected number of messages")
383
384        # The 1st message should be an Authentication against the local
385        # password database
386        msg = messages[0]
387        self.assertEquals("Authentication", msg["type"])
388        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
389            "PASSDB, wbinfo,"))
390        self.assertEquals("winbind",
391                          msg["Authentication"]["serviceDescription"])
392        # Logon type should be Interactive
393        self.assertEquals(2, msg["Authentication"]["logonType"])
394        # Event code should be Unsuccessful logon
395        self.assertEquals(4625, msg["Authentication"]["eventId"])
396        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
397        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
398        self.assertEquals('', msg["Authentication"]["clientDomain"])
399        # This is what the existing winbind implementation returns.
400        self.assertEquals("NT_STATUS_NO_SUCH_USER",
401                          msg["Authentication"]["status"])
402        self.assertEquals("NTLMv2", msg["Authentication"]["passwordType"])
403        self.assertEquals(self.credentials.get_username(),
404                          msg["Authentication"]["clientAccount"])
405        self.assertEquals("", msg["Authentication"]["clientDomain"])
406
407        logon_id = msg["Authentication"]["logonId"]
408
409        # The 2nd message should be a PAM_AUTH with the same logon id as the
410        # 1st message
411        msg = messages[1]
412        self.assertEquals("Authentication", msg["type"])
413        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
414            "PAM_AUTH"))
415        self.assertEquals("winbind",
416                          msg["Authentication"]["serviceDescription"])
417        self.assertEquals(logon_id, msg["Authentication"]["logonId"])
418        self.assertEquals("Plaintext", msg["Authentication"]["passwordType"])
419        # Logon type should be NetworkCleartext
420        self.assertEquals(8, msg["Authentication"]["logonType"])
421        # Event code should be Unsuccessful logon
422        self.assertEquals(4625, msg["Authentication"]["eventId"])
423        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
424        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
425        self.assertEquals('', msg["Authentication"]["clientDomain"])
426        # This is what the existing winbind implementation returns.
427        self.assertEquals("NT_STATUS_NO_SUCH_USER",
428                          msg["Authentication"]["status"])
429        self.assertEquals(self.credentials.get_username(),
430                          msg["Authentication"]["clientAccount"])
431        self.assertEquals("", msg["Authentication"]["clientDomain"])
432
433        # The 3rd message should be an NTLM_AUTH
434        msg = messages[2]
435        self.assertEquals("Authentication", msg["type"])
436        self.assertTrue(msg["Authentication"]["authDescription"].startswith(
437            "NTLM_AUTH, wbinfo,"))
438        self.assertEquals("winbind",
439                          msg["Authentication"]["serviceDescription"])
440        self.assertEquals("NTLMv1",
441                          msg["Authentication"]["passwordType"])
442        # Logon type should be Network
443        self.assertEquals(3, msg["Authentication"]["logonType"])
444        self.assertEquals("NT_STATUS_OK", msg["Authentication"]["status"])
445        # Event code should be successful logon
446        self.assertEquals(4624, msg["Authentication"]["eventId"])
447        self.assertEquals("unix:", msg["Authentication"]["remoteAddress"])
448        self.assertEquals("unix:", msg["Authentication"]["localAddress"])
449        self.assertEquals(self.credentials.get_username(),
450                          msg["Authentication"]["clientAccount"])
451        self.assertEquals(self.credentials.get_domain(),
452                          msg["Authentication"]["clientDomain"])
453
454        logon_id = msg["Authentication"]["logonId"]
455        #
456        # Now check the Domain server authentication message
457        #
458        self.check_domain_server_authentication(pipe, logon_id, "network")
459