1import os 2import logging 3 4log = logging.getLogger(__name__) 5 6 7def load_files(path, extensions=[], filters=[], read=False): 8 """ 9 Method to load files from path, and filter file names with 10 REs filters and extensions. 11 Args: 12 path (str): string that contains OS path 13 extensions (list): list of strings files' extensions like ['txt', 'log', 'conf'] 14 filters (list): list of strings regexes to filter files 15 read (bool): if False will return file names, if true will 16 Returns: 17 List of (type, text_data) tuples or empty list [] if 18 read True, if read False return (type, url,) or [] 19 """ 20 files = [] 21 # need to use path[:5000] cause if path is actually text of the template 22 # and has length more then X symbols, os.path will choke with "path too long" 23 # error, hence the safe-assumption that no os path exists longer then 5000 symbols 24 25 # check if structured, non text, data given, return it as is if so 26 # to process within input macro/function 27 if not isinstance(path, str): 28 return [ 29 ( 30 "structured_data", 31 path, 32 ) 33 ] 34 elif _ttp_["python_major_version"] == 2: 35 if not isinstance( 36 path, 37 ( 38 unicode, 39 str, 40 ), 41 ): 42 return [ 43 ( 44 "structured_data", 45 path, 46 ) 47 ] 48 49 # check if path is a reference to template in ttp_templates collection 50 if path.startswith("ttp://"): 51 from ttp_templates import get_template 52 53 return [("text_data", get_template(path=path.replace("ttp://", "")))] 54 # check if path is a path to file: 55 elif os.path.isfile(path[:5000]): 56 if read: 57 try: 58 if _ttp_["python_major_version"] == 2: 59 with open(path, "r") as file_obj: 60 return [ 61 ( 62 "text_data", 63 file_obj.read(), 64 ) 65 ] 66 with open(path, "r", encoding="utf-8") as file_obj: 67 return [ 68 ( 69 "text_data", 70 file_obj.read(), 71 ) 72 ] 73 except UnicodeDecodeError: 74 log.warning( 75 'ttp_utils.load_files: Unicode read error, file "{}"'.format(path) 76 ) 77 else: 78 return [ 79 ( 80 "file_name", 81 path, 82 ) 83 ] 84 # check if path is a directory: 85 elif os.path.isdir(path[0:5000]): 86 from re import search as re_search 87 88 files = [f for f in os.listdir(path) if os.path.isfile(os.path.join(path, f))] 89 if extensions: 90 files = [f for f in files if f.split(".")[-1] in extensions] 91 for filter in filters: 92 files = [f for f in files if re_search(filter, f)] 93 if read: 94 ret = [] 95 for f in files: 96 if _ttp_["python_major_version"] == 2: 97 with open((os.path.join(path, f)), "r") as file_obj: 98 ret.append( 99 ( 100 "text_data", 101 file_obj.read(), 102 ) 103 ) 104 elif _ttp_["python_major_version"] == 3: 105 with open( 106 (os.path.join(path, f)), "r", encoding="utf-8" 107 ) as file_obj: 108 ret.append( 109 ( 110 "text_data", 111 file_obj.read(), 112 ) 113 ) 114 return ret 115 else: 116 return [ 117 ( 118 "file_name", 119 os.path.join(path, f), 120 ) 121 for f in files 122 ] 123 # check if path is a string: 124 elif isinstance(path, str): 125 return [ 126 ( 127 "text_data", 128 path, 129 ) 130 ] 131 # check if py2, if so check if path is unicode string: 132 elif _ttp_["python_major_version"] == 2: 133 if isinstance(path, unicode): 134 return [ 135 ( 136 "text_data", 137 path, 138 ) 139 ] 140 else: 141 return [] 142 143 144def load_struct(text_data="", **kwargs): 145 """Method to load structured data from text 146 or from file(s) given in include attribute 147 Args: 148 element (obj): ETree xml tag object 149 Returns: 150 empy {} dict if nothing found, or python dictionary of loaded 151 data from elemnt.text string or from included text files 152 """ 153 result = {} 154 loader = kwargs.get("load", "python").lower() 155 include = kwargs.get("include", None) 156 if not text_data and include is None: 157 return None 158 elif text_data is None and include: 159 text_data = "" 160 # dispatcher: 161 loaders = { 162 "ini": load_ini, 163 "python": load_python, 164 "yaml": load_yaml, 165 "json": load_json, 166 "csv": load_csv, 167 "text": load_text, 168 } 169 # run function to load structured data 170 result = loaders[loader](text_data, **kwargs) 171 return result 172 173 174def load_text(text_data, include=None, **kwargs): 175 return text_data 176 177 178def _get_include_data(text_data, include): 179 files = load_files(path=include, extensions=[], filters=[], read=True) 180 for datum in files: 181 text_data += "\n" + datum[1] 182 return text_data 183 184 185def load_ini(text_data, include=None, **kwargs): 186 if _ttp_["python_major_version"] == 3: 187 import configparser 188 189 cfgparser = configparser.ConfigParser() 190 # to make cfgparser keep the case, e.g. VlaN222 will not become vlan222: 191 cfgparser.optionxform = str 192 # read from ini files first 193 if include: 194 files = load_files(path=include, extensions=[], filters=[], read=False) 195 for datum in files: 196 try: 197 cfgparser.read(datum[1]) 198 except: 199 log.error( 200 "ttp_utils.load_struct: Pythom3, Unable to load ini formatted data\n'{}'".format( 201 text_data 202 ) 203 ) 204 # read from tag text next to make it more specific: 205 if text_data: 206 try: 207 cfgparser.read_string(text_data) 208 except: 209 log.error( 210 "ttp_utils.load_struct: Python3, Unable to load ini formatted data\n'{}'".format( 211 text_data 212 ) 213 ) 214 # convert configparser object into dictionary 215 result = {k: dict(cfgparser.items(k)) for k in list(cfgparser.keys())} 216 elif _ttp_["python_major_version"] == 2: 217 import ConfigParser 218 import StringIO # pylint: disable=import-error 219 220 cfgparser = ConfigParser.ConfigParser() 221 # to make cfgparser keep the case, e.g. VlaN222 will not become vlan222: 222 cfgparser.optionxform = str 223 # read from ini files first 224 if include: 225 files = load_files(path=include, extensions=[], filters=[], read=False) 226 for datum in files: 227 try: 228 cfgparser.read(datum[1]) 229 except: 230 log.error( 231 "ttp_utils.load_struct: Python2, Unable to load ini formatted data\n'{}'".format( 232 text_data 233 ) 234 ) 235 # read from tag text next to make it more specific: 236 if text_data: 237 buf_text_data = StringIO.StringIO(text_data) 238 try: 239 cfgparser.readfp(buf_text_data) 240 except: 241 log.error( 242 "ttp_utils.load_struct: Python2, Unable to load ini formatted data\n'{}'".format( 243 text_data 244 ) 245 ) 246 # convert configparser object into dictionary 247 result = {k: dict(cfgparser.items(k)) for k in list(cfgparser.sections())} 248 if "DEFAULT" in result: 249 if not result["DEFAULT"]: # delete empty DEFAULT section 250 result.pop("DEFAULT") 251 return result 252 253 254def load_python(text_data, include=None, **kwargs): 255 data = {} 256 if include: 257 text_data = _get_include_data(text_data, include) 258 if not text_data.strip(): 259 return {} 260 try: 261 data = _ttp_["utils"]["load_python_exec"](text_data) 262 return data 263 except SyntaxError as e: 264 log.error( 265 "ttp_utils.load_struct: Unable to load Python formatted data\n'{}'Make sure that correct loader used to load data, error:\n{}".format( 266 text_data, e 267 ) 268 ) 269 270 271def load_yaml(text_data, include=None, **kwargs): 272 try: 273 from yaml import safe_load 274 except ModuleNotFoundError: 275 log.error( 276 "loaders.load_yaml: failed to import yaml module, install: 'python -m pip install pyyaml'" 277 ) 278 data = {} 279 if include: 280 text_data = _get_include_data(text_data, include) 281 try: 282 data = safe_load(text_data) 283 except: 284 log.error( 285 "ttp_utils.load_struct: Unable to load YAML formatted data\n'{}'".format( 286 text_data 287 ) 288 ) 289 return data 290 291 292def load_json(text_data, include=None, **kwargs): 293 from json import loads 294 295 data = {} 296 if include: 297 text_data = _get_include_data(text_data, include) 298 try: 299 data = loads(text_data) 300 return data 301 except: 302 log.error( 303 "ttp_utils.load_struct: Unable to load JSON formatted data\n'{}'".format( 304 text_data 305 ) 306 ) 307 308 309def load_csv(text_data, include=None, **kwargs): 310 """Method to load csv data and convert it to dictionary 311 using given key-header-column as keys or first column as keys 312 """ 313 from csv import reader 314 315 key = kwargs.get("key", None) 316 data = {} 317 headers = [] 318 if include: 319 text_data = _get_include_data(text_data, include) 320 for row in reader(iter(text_data.splitlines())): 321 if not row: 322 continue 323 if not headers: 324 headers = row 325 if not key: 326 key = headers[0] 327 elif key and key not in headers: 328 return data 329 continue 330 temp = {headers[index]: i for index, i in enumerate(row)} 331 data[temp.pop(key)] = temp 332 return data 333