1# Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export.
2# Copyright (C) Andrew Bartlett 2015
3#
4# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz>
5#
6# This program is free software; you can redistribute it and/or modify
7# it under the terms of the GNU General Public License as published by
8# the Free Software Foundation; either version 3 of the License, or
9# (at your option) any later version.
10#
11# This program is distributed in the hope that it will be useful,
12# but WITHOUT ANY WARRANTY; without even the implied warranty of
13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14# GNU General Public License for more details.
15#
16# You should have received a copy of the GNU General Public License
17# along with this program.  If not, see <http://www.gnu.org/licenses/>.
18#
19
20"""Tests for samba.kcc.ldif_import_export"""
21
22import samba
23import os
24import time
25import subprocess
26import logging
27import samba.tests
28from samba.kcc import ldif_import_export, KCC
29from samba import ldb
30from samba.dcerpc import misc
31
32
33from samba.param import LoadParm
34from samba.credentials import Credentials
35from samba.samdb import SamDB
36
37unix_now = int(time.time())
38
39MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
40                              "testdata/ldif-utils-test-multisite.ldif")
41
42
43# UNCONNECTED_LDIF is a single site, unconnected 5DC database that was
44# created using samba-tool domain join in testenv.
45UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'],
46                                "testdata/unconnected-intrasite.ldif")
47
48MULTISITE_LDIF_DSAS = (
49    ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
50     "Site-4"),
51    ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
52     "Site-4"),
53    ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
54     "Site-3"),
55    ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
56     "Site-5"),
57    ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
58     "Site-5"),
59    ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
60     "Site-2"),
61    ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
62     "Site-2"),
63    ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
64     "Site-2"),
65    ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
66     "Site-2"),
67    ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com",
68     "Default-First-Site-Name"),
69)
70
71
72class LdifImportExportTests(samba.tests.TestCaseInTempDir):
73    def setUp(self):
74        super(LdifImportExportTests, self).setUp()
75        self.lp = LoadParm()
76        self.creds = Credentials()
77        self.creds.guess(self.lp)
78
79    def remove_files(self, *files):
80        for f in files:
81            assert(f.startswith(self.tempdir))
82            os.unlink(f)
83
84    def test_write_search_url(self):
85        pass
86
87    def test_ldif_to_samdb(self):
88        dburl = os.path.join(self.tempdir, "ldap")
89        samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
90                                                 MULTISITE_LDIF)
91        self.assertIsInstance(samdb, SamDB)
92
93        dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
94               "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
95        res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
96                           scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
97
98        ntds_guid = misc.GUID(samdb.get_ntds_GUID())
99        self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
100
101        service_name_res = samdb.search(base="",
102                                        scope=ldb.SCOPE_BASE,
103                                        attrs=["dsServiceName"])
104        dn = ldb.Dn(samdb,
105                    service_name_res[0]["dsServiceName"][0].decode('utf8'))
106        self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
107        self.remove_files(dburl)
108
109    def test_ldif_to_samdb_forced_local_dsa(self):
110        for dsa, site in MULTISITE_LDIF_DSAS:
111            dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa"
112                                 "-%s" % dsa)
113            samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
114                                                     MULTISITE_LDIF,
115                                                     forced_local_dsa=dsa)
116            self.assertIsInstance(samdb, SamDB)
117            self.assertEqual(samdb.server_site_name(), site)
118
119            res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
120                               scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
121
122            ntds_guid = misc.GUID(samdb.get_ntds_GUID())
123            self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid)
124
125            service_name_res = samdb.search(base="",
126                                            scope=ldb.SCOPE_BASE,
127                                            attrs=["dsServiceName"])
128            dn = ldb.Dn(samdb,
129                        service_name_res[0]["dsServiceName"][0].decode('utf8'))
130            self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa))
131            self.remove_files(dburl)
132
133    def test_samdb_to_ldif_file(self):
134        dburl = os.path.join(self.tempdir, "ldap")
135        dburl2 = os.path.join(self.tempdir, "ldap_roundtrip")
136        ldif_file = os.path.join(self.tempdir, "ldif")
137        samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp,
138                                                 MULTISITE_LDIF)
139        self.assertIsInstance(samdb, SamDB)
140        ldif_import_export.samdb_to_ldif_file(samdb, dburl,
141                                              lp=self.lp, creds=None,
142                                              ldif_file=ldif_file)
143        self.assertGreater(os.path.getsize(ldif_file), 1000,
144                           "LDIF should be larger than 1000 bytes")
145        samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp,
146                                                 ldif_file)
147        self.assertIsInstance(samdb, SamDB)
148        dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,"
149               "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com")
150        res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa),
151                           scope=ldb.SCOPE_BASE, attrs=["objectGUID"])
152        self.remove_files(dburl)
153        self.remove_files(dburl2)
154        self.remove_files(ldif_file)
155
156
157class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir):
158    def setUp(self):
159        super(KCCMultisiteLdifTests, self).setUp()
160        self.lp = LoadParm()
161        self.creds = Credentials()
162        self.creds.guess(self.lp)
163
164    def remove_files(self, *files):
165        for f in files:
166            assert(f.startswith(self.tempdir))
167            os.unlink(f)
168
169    def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None):
170        # Note that setting read-only to False won't affect the ldif,
171        # only the temporary database that is created from it.
172        my_kcc = KCC(unix_now, readonly=readonly, verify=verify,
173                     dot_file_dir=dot_file_dir)
174        tmpdb = os.path.join(self.tempdir, 'tmpdb')
175        my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
176        self.remove_files(tmpdb)
177        return my_kcc
178
179    def test_list_dsas(self):
180        my_kcc = self._get_kcc('test-list')
181        dsas = set(my_kcc.list_dsas())
182        expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS)
183        self.assertEqual(dsas, expected_dsas)
184
185    def test_verify(self):
186        """Check that the KCC generates graphs that pass its own verify
187        option.
188        """
189        my_kcc = self._get_kcc('test-verify', verify=True)
190        tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
191        my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
192
193        my_kcc.run(None,
194                   self.lp, self.creds,
195                   attempt_live_connections=False)
196        self.remove_files(tmpdb)
197
198    def test_unconnected_db(self):
199        """Check that the KCC generates errors on a unconnected db
200        """
201        my_kcc = self._get_kcc('test-verify', verify=True)
202        tmpdb = os.path.join(self.tempdir, 'verify-tmpdb')
203        my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF)
204
205        try:
206            my_kcc.run(None,
207                       self.lp, self.creds,
208                       attempt_live_connections=False)
209        except samba.kcc.graph_utils.GraphError:
210            pass
211        except Exception:
212            self.fail("Did not expect this error.")
213        finally:
214            self.remove_files(tmpdb)
215
216    def test_dotfiles(self):
217        """Check that KCC writes dot_files when asked.
218        """
219        my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir)
220        tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb')
221        files = [tmpdb]
222        my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF)
223        my_kcc.run(None,
224                   self.lp, self.creds,
225                   attempt_live_connections=False)
226
227        dot = '/usr/bin/dot'
228        for fn in os.listdir(self.tempdir):
229            if fn.endswith('.dot'):
230                ffn = os.path.join(self.tempdir, fn)
231                if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0:
232                    r = subprocess.call([dot, '-Tcanon', ffn])
233                    self.assertEqual(r, 0)
234
235                # even if dot is not there, at least check the file is non-empty
236                size = os.stat(ffn).st_size
237                self.assertNotEqual(size, 0)
238                files.append(ffn)
239
240        self.remove_files(*files)
241