1#!/usr/bin/env python3 2# Unix SMB/CIFS implementation. 3# Tests for smbcquotas 4# Copyright (C) Noel Power 2017 5 6# This program is free software; you can redistribute it and/or modify 7# it under the terms of the GNU General Public License as published by 8# the Free Software Foundation; either version 3 of the License, or 9# (at your option) any later version. 10 11# This program is distributed in the hope that it will be useful, 12# but WITHOUT ANY WARRANTY; without even the implied warranty of 13# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14# GNU General Public License for more details. 15 16# You should have received a copy of the GNU General Public License 17# along with this program. If not, see <http://www.gnu.org/licenses/>. 18 19import os, subprocess, sys 20import traceback 21import logging 22import shutil 23 24USER_QUOTAS = 1 25USER_DEFAULT_QUOTAS = 2 26GROUP_QUOTAS = 3 27GROUP_DEFAULT_QUOTAS = 4 28BLOCK_SIZE = 1024 29DEFAULT_SOFTLIM = 2 30DEFAULT_HARDLIM = 4 31 32class test_env: 33 def __init__(self): 34 self.server = None 35 self.domain = None 36 self.username = None 37 self.password = None 38 self.envdir = None 39 self.quota_db = None 40 self.smbcquotas = None 41 self.users = [] 42 43class user_info: 44 def __init__(self): 45 self.uid = 0 46 self.username = "" 47 self.softlim = 0 48 self.hardlim = 0 49 50class Quota: 51 def __init__(self): 52 self.flags = 0 53 self.quotatype = USER_DEFAULT_QUOTAS 54 self.uid = 0 55 self.usedblocks = 0 56 self.softlimit = 0 57 self.hardlimit = 0 58 self.hardlimit = 0 59 self.usedinodes = 0 60 self.slimitinodes = 0 61 self.hlimitinodes = 0 62 63def init_quota_db(users, output_file): 64 filecontents = open(output_file,"w+") 65 lines = "" 66 default_values = "0 " + str(DEFAULT_SOFTLIM) + " " + str(DEFAULT_HARDLIM) + " 0 0 0" 67 for user in users: 68 lines = lines + user.uid + " " + default_values + "\n" 69 filecontents.write(lines) 70 filecontents.close() 71 72def load_quotas(input_file): 73 fileContents = open(input_file,"r") 74 lineno = 0 75 quotas = [] 76 for line in fileContents: 77 if line.strip().startswith("#"): 78 continue 79 content = line.strip().split() 80 quota = Quota() 81 if len(content) < 7: 82 logging.debug("ignoring line %d, doesn't have enough fields\n"%lineno) 83 else: 84 quota.flags = 2 85 quota.uid = content[0] 86 quota.usedblocks = content[1] 87 quota.softlimit = content[2] 88 quota.hardlimit = content[3] 89 quota.usedinodes = content[4] 90 quota.slimitinodes = content[5] 91 quota.hlimitinodes = content[6] 92 quotas.append(quota) 93 94 fileContents.close() 95 return quotas 96 97def get_quotas(uid, quota_list): 98 for quota in quota_list: 99 if quota.uid == uid: 100 return quota 101 return None 102 103def get_users(): 104 output = subprocess.Popen(['getent', 'passwd'], 105 stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') 106 users = [] 107 for line in output: 108 info = line.split(':') 109 if len(info) > 3 and info[0]: 110 user = user_info() 111 user.username = info[0] 112 user.uid = info[2] 113 logging.debug("Adding user ->%s<-\n"%user.username) 114 users.append(user) 115 return users 116 117 118 119def smbcquota_output_to_userinfo(output): 120 infos = [] 121 for line in output: 122 if len(line) > 1: 123 username = line.strip(':').split()[0] 124 quota_info = line.split(':')[1].split('/') 125 if len(quota_info) > 2: 126 info = user_info() 127 info.username = username.strip() 128 info.softlim = int(quota_info[1].strip()) / BLOCK_SIZE 129 info.hardlim = int(quota_info[2].strip()) / BLOCK_SIZE 130 infos.append(info) 131 return infos 132 133def check_quota_limits(infos, softlim, hardlim): 134 if len(infos) < 1: 135 logging.debug("no users info to check :-(\n") 136 return False 137 for info in infos: 138 if int(info.softlim) != softlim: 139 logging.debug("expected softlimit %s got ->%s<-\n"%(softlim, info.softlim)) 140 return False 141 if int(info.hardlim) != hardlim: 142 logging.debug("expected hardlimit limit %s got %s\n"%(hardlim,info.hardlim)) 143 return False 144 return True 145 146class test_base: 147 def __init__(self, env): 148 self.env = env 149 def run(self, protocol): 150 pass 151 152class listtest(test_base): 153 def run(self, protocol): 154 init_quota_db(self.env.users, self.env.quota_db) 155 quotas = load_quotas(self.env.quota_db) 156 args = [self.env.smbcquotas]; 157 remaining_args = ['-U' + self.env.username + "%" + self.env.password, '-L', '//' + self.env.server + '/quotadir'] 158 if protocol == 'smb2': 159 args.append('-m smb2') 160 args.extend(remaining_args) 161 output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '-L', '//' + self.env.server + '/quotadir'], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') 162 infos = smbcquota_output_to_userinfo(output) 163 return check_quota_limits(infos, DEFAULT_SOFTLIM, DEFAULT_HARDLIM) 164def get_uid(name, users): 165 for user in users: 166 if user.username == name: 167 return user.uid 168 return None 169 170class gettest(test_base): 171 def run(self, protocol): 172 init_quota_db(self.env.users, self.env.quota_db) 173 quotas = load_quotas(self.env.quota_db) 174 uid = get_uid(self.env.username, self.env.users) 175 output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '-u' + self.env.username, '//' + self.env.server + '/quotadir'], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') 176 user_infos = smbcquota_output_to_userinfo(output) 177 db_user_info = get_quotas(uid, quotas) 178 # double check, we compare the results from the db file 179 # the quota script the server uses compared to what 180 # smbcquota is telling us 181 return check_quota_limits(user_infos, int(db_user_info.softlimit), int(db_user_info.hardlimit)) 182 183class settest(test_base): 184 def run(self, protocol): 185 init_quota_db(self.env.users, self.env.quota_db) 186 quotas = load_quotas(self.env.quota_db) 187 uid = get_uid(self.env.username, self.env.users) 188 old_db_user_info = get_quotas(uid, quotas) 189 190 #increase limits by 2 blocks 191 new_soft_limit = (int(old_db_user_info.softlimit) + 2) * BLOCK_SIZE 192 new_hard_limit = (int(old_db_user_info.hardlimit) + 2) * BLOCK_SIZE 193 194 new_limits = "UQLIM:%s:%d/%d"%(self.env.username, new_soft_limit, new_hard_limit) 195 logging.debug("setting new limits %s"%new_limits) 196 197 output = subprocess.Popen([self.env.smbcquotas, '-U' + self.env.username + "%" + self.env.password, '//' + self.env.server + '/quotadir', '-S', new_limits], stdout=subprocess.PIPE).communicate()[0].decode("utf-8").split('\n') 198 logging.debug("output from smbcquota is %s"%output) 199 user_infos = smbcquota_output_to_userinfo(output) 200 return check_quota_limits(user_infos, new_soft_limit / BLOCK_SIZE, new_hard_limit / BLOCK_SIZE) 201 202# map of tests 203subtest_descriptions = { 204 "list test" : listtest, 205 "get test" : gettest, 206 "set test" : settest 207} 208 209def main(): 210 logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG) 211 212 logging.debug("got args %s\n"%str(sys.argv)) 213 214 if len(sys.argv) < 7: 215 logging.debug ("Usage: test_smbcquota.py server domain username password envdir smbcquotas\n") 216 sys.exit(1) 217 env = test_env() 218 env.server = sys.argv[1] 219 env.domain = sys.argv[2] 220 env.username = sys.argv[3] 221 env.password = sys.argv[4] 222 env.envdir = sys.argv[5] 223 env.smbcquotas = sys.argv[6] 224 quota_script = os.path.join(os.path.dirname(sys.argv[0]), 225 "getset_quota.py") 226 #copy the quota script to the evironment 227 shutil.copy2(quota_script, env.envdir) 228 229 env.quota_db = os.path.join(env.envdir, "quotas.db") 230 env.users = get_users() 231 for protocol in ['smb1', 'smb2']: 232 for key in subtest_descriptions.keys(): 233 test = subtest_descriptions[key](env) 234 logging.debug("running subtest '%s' using protocol '%s'\n"%(key,protocol)) 235 result = test.run(protocol) 236 if result == False: 237 logging.debug("subtest '%s' for '%s' failed\n"%(key,protocol)) 238 sys.exit(1) 239 else: 240 logging.debug("subtest '%s' for '%s' passed\n"%(key,protocol)) 241 sys.exit(0) 242 243if __name__ == '__main__': 244 main() 245