1#!/usr/bin/env python 2# Copyright (c) 2015 MetPy Developers. 3# Distributed under the terms of the BSD 3-Clause License. 4# SPDX-License-Identifier: BSD-3-Clause 5"""Parse specification extracted from NEXRAD ICD PDFs and generate Python code.""" 6 7import warnings 8 9 10def register_processor(num): 11 """Register functions to handle particular message numbers.""" 12 def inner(func): 13 """Perform actual function registration.""" 14 processors[num] = func 15 return func 16 return inner 17 18 19processors = {} 20 21 22@register_processor(3) 23def process_msg3(fname): 24 """Handle information for message type 3.""" 25 with open(fname, 'r') as infile: 26 info = [] 27 for lineno, line in enumerate(infile): 28 parts = line.split(' ') 29 try: 30 var_name, desc, typ, units = parts[:4] 31 size_hw = parts[-1] 32 if '-' in size_hw: 33 start, end = map(int, size_hw.split('-')) 34 size = (end - start + 1) * 2 35 else: 36 size = 2 37 38 assert size >= 2 39 fmt = fix_type(typ, size) 40 41 var_name = fix_var_name(var_name) 42 full_desc = fix_desc(desc, units) 43 44 info.append({'name': var_name, 'desc': full_desc, 'fmt': fmt}) 45 46 if ignored_item(info[-1]) and var_name != 'Spare': 47 warnings.warn('{} has type {}. Setting as Spare'.format(var_name, typ)) 48 49 except (ValueError, AssertionError): 50 warnings.warn('{} > {}'.format(lineno + 1, ':'.join(parts))) 51 raise 52 return info 53 54 55@register_processor(18) 56def process_msg18(fname): 57 """Handle information for message type 18.""" 58 with open(fname, 'r') as infile: 59 info = [] 60 for lineno, line in enumerate(infile): 61 parts = line.split(' ') 62 try: 63 if len(parts) == 8: 64 parts = parts[:6] + [parts[6] + parts[7]] 65 66 var_name, desc, typ, units, rng, prec, byte_range = parts 67 start, end = map(int, byte_range.split('-')) 68 size = end - start + 1 69 assert size >= 4 70 fmt = fix_type(typ, size, 71 additional=[('See Note (5)', ('{size}s', 1172))]) 72 73 if ' ' in var_name: 74 warnings.warn('Space in {}'.format(var_name)) 75 if not desc: 76 warnings.warn('null description for {}'.format(var_name)) 77 78 var_name = fix_var_name(var_name) 79 full_desc = fix_desc(desc, units) 80 81 info.append({'name': var_name, 'desc': full_desc, 'fmt': fmt}) 82 83 if (ignored_item(info[-1]) and var_name != 'SPARE' 84 and 'SPARE' not in full_desc): 85 warnings.warn('{} has type {}. Setting as SPARE'.format(var_name, typ)) 86 87 except (ValueError, AssertionError): 88 warnings.warn('{} > {}'.format(lineno + 1, ':'.join(parts))) 89 raise 90 return info 91 92 93types = [('Real*4', ('f', 4)), ('Integer*4', ('L', 4)), ('SInteger*4', ('l', 4)), 94 ('Integer*2', ('H', 2)), 95 ('', lambda s: ('{size}x', s)), ('N/A', lambda s: ('{size}x', s)), 96 (lambda t: t.startswith('String'), lambda s: ('{size}s', s))] 97 98 99def fix_type(typ, size, additional=None): 100 """Fix up creating the appropriate struct type based on the information in the column.""" 101 if additional is not None: 102 my_types = types + additional 103 else: 104 my_types = types 105 106 for t, info in my_types: 107 if callable(t): 108 matches = t(typ) 109 else: 110 matches = t == typ 111 112 if matches: 113 if callable(info): 114 fmt_str, true_size = info(size) 115 else: 116 fmt_str, true_size = info 117 assert size == true_size, ('{}: Got size {} instead of {}'.format(typ, size, 118 true_size)) 119 return fmt_str.format(size=size) 120 121 raise ValueError('No type match! ({})'.format(typ)) 122 123 124def fix_var_name(var_name): 125 """Clean up and apply standard formatting to variable names.""" 126 name = var_name.strip() 127 for char in '(). /#,': 128 name = name.replace(char, '_') 129 name = name.replace('+', 'pos_') 130 name = name.replace('-', 'neg_') 131 if name.endswith('_'): 132 name = name[:-1] 133 return name 134 135 136def fix_desc(desc, units=None): 137 """Clean up description column.""" 138 full_desc = desc.strip() 139 if units and units != 'N/A': 140 if full_desc: 141 full_desc += ' (' + units + ')' 142 else: 143 full_desc = units 144 return full_desc 145 146 147def ignored_item(item): 148 """Determine whether this item should be ignored.""" 149 return item['name'].upper() == 'SPARE' or 'x' in item['fmt'] 150 151 152def need_desc(item): 153 """Determine whether we need a description for this item.""" 154 return item['desc'] and not ignored_item(item) 155 156 157def field_name(item): 158 """Return the field name if appropriate.""" 159 return '"{:s}"'.format(item['name']) if not ignored_item(item) else None 160 161 162def field_fmt(item): 163 """Return the field format if appropriate.""" 164 return '"{:s}"'.format(item['fmt']) if '"' not in item['fmt'] else item['fmt'] 165 166 167def write_file(fname, info): 168 """Write out the generated Python code.""" 169 with open(fname, 'w') as outfile: 170 # File header 171 outfile.write('# Copyright (c) 2018 MetPy Developers.\n') 172 outfile.write('# Distributed under the terms of the BSD 3-Clause License.\n') 173 outfile.write('# SPDX-License-Identifier: BSD-3-Clause\n\n') 174 outfile.write('# flake8: noqa\n') 175 outfile.write('# Generated file -- do not modify\n') 176 177 # Variable descriptions 178 outfile.write('descriptions = {') 179 outdata = ',\n '.join('"{name}": "{desc}"'.format( 180 **i) for i in info if need_desc(i)) 181 outfile.write(outdata) 182 outfile.write('}\n\n') 183 184 # Now the struct format 185 outfile.write('fields = [') 186 outdata = ',\n '.join('({fname}, "{fmt}")'.format( 187 fname=field_name(i), **i) for i in info) 188 outfile.write(outdata) 189 outfile.write(']\n') 190 191 192if __name__ == '__main__': 193 from pathlib import Path 194 195 for num in [18, 3]: 196 fname = 'msg{:d}.spec'.format(num) 197 print(f'Processing {fname}...') # noqa: T001 198 info = processors[num](fname) 199 fname = Path(fname).with_suffix('.py') 200 write_file(fname, info) 201