1# Unix SMB/CIFS implementation. Tests for samba.kcc.ldif_import_export. 2# Copyright (C) Andrew Bartlett 2015 3# 4# Written by Douglas Bagnall <douglas.bagnall@catalyst.net.nz> 5# 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 3 of the License, or 9# (at your option) any later version. 10# 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15# 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18# 19 20"""Tests for samba.kcc.ldif_import_export""" 21 22import samba 23import os 24import time 25import subprocess 26import logging 27import samba.tests 28from samba.kcc import ldif_import_export, KCC 29from samba import ldb 30from samba.dcerpc import misc 31 32 33from samba.param import LoadParm 34from samba.credentials import Credentials 35from samba.samdb import SamDB 36 37unix_now = int(time.time()) 38 39MULTISITE_LDIF = os.path.join(os.environ['SRCDIR_ABS'], 40 "testdata/ldif-utils-test-multisite.ldif") 41 42 43# UNCONNECTED_LDIF is a single site, unconnected 5DC database that was 44# created using samba-tool domain join in testenv. 45UNCONNECTED_LDIF = os.path.join(os.environ['SRCDIR_ABS'], 46 "testdata/unconnected-intrasite.ldif") 47 48MULTISITE_LDIF_DSAS = ( 49 ("CN=WIN08,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 50 "Site-4"), 51 ("CN=WIN07,CN=Servers,CN=Site-4,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 52 "Site-4"), 53 ("CN=WIN06,CN=Servers,CN=Site-3,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 54 "Site-3"), 55 ("CN=WIN09,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 56 "Site-5"), 57 ("CN=WIN10,CN=Servers,CN=Site-5,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 58 "Site-5"), 59 ("CN=WIN02,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 60 "Site-2"), 61 ("CN=WIN04,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 62 "Site-2"), 63 ("CN=WIN03,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 64 "Site-2"), 65 ("CN=WIN05,CN=Servers,CN=Site-2,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 66 "Site-2"), 67 ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites,CN=Configuration,DC=ad,DC=samba,DC=example,DC=com", 68 "Default-First-Site-Name"), 69) 70 71 72class LdifImportExportTests(samba.tests.TestCaseInTempDir): 73 def setUp(self): 74 super(LdifImportExportTests, self).setUp() 75 self.lp = LoadParm() 76 self.creds = Credentials() 77 self.creds.guess(self.lp) 78 79 def remove_files(self, *files): 80 for f in files: 81 assert(f.startswith(self.tempdir)) 82 os.unlink(f) 83 84 def test_write_search_url(self): 85 pass 86 87 def test_ldif_to_samdb(self): 88 dburl = os.path.join(self.tempdir, "ldap") 89 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, 90 MULTISITE_LDIF) 91 self.assertIsInstance(samdb, SamDB) 92 93 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," 94 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") 95 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), 96 scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) 97 98 ntds_guid = misc.GUID(samdb.get_ntds_GUID()) 99 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) 100 101 service_name_res = samdb.search(base="", 102 scope=ldb.SCOPE_BASE, 103 attrs=["dsServiceName"]) 104 dn = ldb.Dn(samdb, 105 service_name_res[0]["dsServiceName"][0].decode('utf8')) 106 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) 107 self.remove_files(dburl) 108 109 def test_ldif_to_samdb_forced_local_dsa(self): 110 for dsa, site in MULTISITE_LDIF_DSAS: 111 dburl = os.path.join(self.tempdir, "ldif-to-samba-forced-local-dsa" 112 "-%s" % dsa) 113 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, 114 MULTISITE_LDIF, 115 forced_local_dsa=dsa) 116 self.assertIsInstance(samdb, SamDB) 117 self.assertEqual(samdb.server_site_name(), site) 118 119 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), 120 scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) 121 122 ntds_guid = misc.GUID(samdb.get_ntds_GUID()) 123 self.assertEqual(misc.GUID(res[0]["objectGUID"][0]), ntds_guid) 124 125 service_name_res = samdb.search(base="", 126 scope=ldb.SCOPE_BASE, 127 attrs=["dsServiceName"]) 128 dn = ldb.Dn(samdb, 129 service_name_res[0]["dsServiceName"][0].decode('utf8')) 130 self.assertEqual(dn, ldb.Dn(samdb, "CN=NTDS Settings," + dsa)) 131 self.remove_files(dburl) 132 133 def test_samdb_to_ldif_file(self): 134 dburl = os.path.join(self.tempdir, "ldap") 135 dburl2 = os.path.join(self.tempdir, "ldap_roundtrip") 136 ldif_file = os.path.join(self.tempdir, "ldif") 137 samdb = ldif_import_export.ldif_to_samdb(dburl, self.lp, 138 MULTISITE_LDIF) 139 self.assertIsInstance(samdb, SamDB) 140 ldif_import_export.samdb_to_ldif_file(samdb, dburl, 141 lp=self.lp, creds=None, 142 ldif_file=ldif_file) 143 self.assertGreater(os.path.getsize(ldif_file), 1000, 144 "LDIF should be larger than 1000 bytes") 145 samdb = ldif_import_export.ldif_to_samdb(dburl2, self.lp, 146 ldif_file) 147 self.assertIsInstance(samdb, SamDB) 148 dsa = ("CN=WIN01,CN=Servers,CN=Default-First-Site-Name,CN=Sites," 149 "CN=Configuration,DC=ad,DC=samba,DC=example,DC=com") 150 res = samdb.search(ldb.Dn(samdb, "CN=NTDS Settings," + dsa), 151 scope=ldb.SCOPE_BASE, attrs=["objectGUID"]) 152 self.remove_files(dburl) 153 self.remove_files(dburl2) 154 self.remove_files(ldif_file) 155 156 157class KCCMultisiteLdifTests(samba.tests.TestCaseInTempDir): 158 def setUp(self): 159 super(KCCMultisiteLdifTests, self).setUp() 160 self.lp = LoadParm() 161 self.creds = Credentials() 162 self.creds.guess(self.lp) 163 164 def remove_files(self, *files): 165 for f in files: 166 assert(f.startswith(self.tempdir)) 167 os.unlink(f) 168 169 def _get_kcc(self, name, readonly=False, verify=False, dot_file_dir=None): 170 # Note that setting read-only to False won't affect the ldif, 171 # only the temporary database that is created from it. 172 my_kcc = KCC(unix_now, readonly=readonly, verify=verify, 173 dot_file_dir=dot_file_dir) 174 tmpdb = os.path.join(self.tempdir, 'tmpdb') 175 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) 176 self.remove_files(tmpdb) 177 return my_kcc 178 179 def test_list_dsas(self): 180 my_kcc = self._get_kcc('test-list') 181 dsas = set(my_kcc.list_dsas()) 182 expected_dsas = set(x[0] for x in MULTISITE_LDIF_DSAS) 183 self.assertEqual(dsas, expected_dsas) 184 185 def test_verify(self): 186 """Check that the KCC generates graphs that pass its own verify 187 option. 188 """ 189 my_kcc = self._get_kcc('test-verify', verify=True) 190 tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') 191 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) 192 193 my_kcc.run(None, 194 self.lp, self.creds, 195 attempt_live_connections=False) 196 self.remove_files(tmpdb) 197 198 def test_unconnected_db(self): 199 """Check that the KCC generates errors on a unconnected db 200 """ 201 my_kcc = self._get_kcc('test-verify', verify=True) 202 tmpdb = os.path.join(self.tempdir, 'verify-tmpdb') 203 my_kcc.import_ldif(tmpdb, self.lp, UNCONNECTED_LDIF) 204 205 try: 206 my_kcc.run(None, 207 self.lp, self.creds, 208 attempt_live_connections=False) 209 except samba.kcc.graph_utils.GraphError: 210 pass 211 except Exception: 212 self.fail("Did not expect this error.") 213 finally: 214 self.remove_files(tmpdb) 215 216 def test_dotfiles(self): 217 """Check that KCC writes dot_files when asked. 218 """ 219 my_kcc = self._get_kcc('test-dotfiles', dot_file_dir=self.tempdir) 220 tmpdb = os.path.join(self.tempdir, 'dotfile-tmpdb') 221 files = [tmpdb] 222 my_kcc.import_ldif(tmpdb, self.lp, MULTISITE_LDIF) 223 my_kcc.run(None, 224 self.lp, self.creds, 225 attempt_live_connections=False) 226 227 dot = '/usr/bin/dot' 228 for fn in os.listdir(self.tempdir): 229 if fn.endswith('.dot'): 230 ffn = os.path.join(self.tempdir, fn) 231 if os.path.exists(dot) and subprocess.call([dot, '-?']) == 0: 232 r = subprocess.call([dot, '-Tcanon', ffn]) 233 self.assertEqual(r, 0) 234 235 # even if dot is not there, at least check the file is non-empty 236 size = os.stat(ffn).st_size 237 self.assertNotEqual(size, 0) 238 files.append(ffn) 239 240 self.remove_files(*files) 241