1# -*- coding: utf-8 -*- 2''' 3Device worker process 4''' 5from __future__ import absolute_import 6 7# Import python stdlib 8import os 9import re 10import signal 11import logging 12import threading 13from datetime import datetime, timedelta 14 15# Import thrid party libs 16import zmq 17import umsgpack 18from prometheus_client import Counter 19 20# Import napalm-logs pkgs 21import napalm_logs.utils 22import napalm_logs.ext.six as six 23from napalm_logs.proc import NapalmLogsProc 24from napalm_logs.config import PUB_PX_IPC_URL 25from napalm_logs.config import DEV_IPC_URL 26# exceptions 27from napalm_logs.exceptions import NapalmLogsExit 28 29log = logging.getLogger(__name__) 30 31 32class NapalmLogsDeviceProc(NapalmLogsProc): 33 ''' 34 Device sub-process class. 35 ''' 36 def __init__(self, 37 name, 38 opts, 39 config): 40 self._name = name 41 log.debug('Starting process for %s', self._name) 42 self._config = config 43 self.opts = opts 44 self.__up = False 45 self.compiled_messages = None 46 self._compile_messages() 47 48 def _exit_gracefully(self, signum, _): 49 log.debug('Caught signal in %s device process', self._name) 50 self.stop() 51 52 def _setup_ipc(self): 53 ''' 54 Subscribe to the right topic 55 in the device IPC and publish to the 56 publisher proxy. 57 ''' 58 self.ctx = zmq.Context() 59 # subscribe to device IPC 60 log.debug('Creating the dealer IPC for %s', self._name) 61 self.sub = self.ctx.socket(zmq.DEALER) 62 if six.PY2: 63 self.sub.setsockopt(zmq.IDENTITY, self._name) 64 elif six.PY3: 65 self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8')) 66 try: 67 self.sub.setsockopt(zmq.HWM, self.opts['hwm']) 68 # zmq 2 69 except AttributeError: 70 # zmq 3 71 self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm']) 72 # subscribe to the corresponding IPC pipe 73 self.sub.connect(DEV_IPC_URL) 74 # publish to the publisher IPC 75 self.pub = self.ctx.socket(zmq.PUB) 76 self.pub.connect(PUB_PX_IPC_URL) 77 try: 78 self.pub.setsockopt(zmq.HWM, self.opts['hwm']) 79 # zmq 2 80 except AttributeError: 81 # zmq 3 82 self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm']) 83 84 def _compile_messages(self): 85 ''' 86 Create a list of all OS messages and their compiled regexs 87 ''' 88 self.compiled_messages = [] 89 if not self._config: 90 return 91 for message_dict in self._config.get('messages', {}): 92 error = message_dict['error'] 93 tag = message_dict['tag'] 94 model = message_dict['model'] 95 match_on = message_dict.get('match_on', 'tag') 96 if '__python_fun__' in message_dict: 97 self.compiled_messages.append({ 98 'error': error, 99 'tag': tag, 100 'match_on': match_on, 101 'model': model, 102 '__python_fun__': message_dict['__python_fun__'] 103 }) 104 continue 105 values = message_dict['values'] 106 line = message_dict['line'] 107 mapping = message_dict['mapping'] 108 # We will now figure out which position each value is in so we can use it with the match statement 109 position = {} 110 replace = {} 111 for key in values.keys(): 112 if '|' in key: 113 new_key, replace[new_key] = key.replace(' ', '').split('|') 114 values[new_key] = values.pop(key) 115 key = new_key 116 position[line.find('{' + key + '}')] = key 117 sorted_position = {} 118 for i, elem in enumerate(sorted(position.items())): 119 sorted_position[elem[1]] = i + 1 120 # Escape the line, then remove the escape for the curly bracets so they can be used when formatting 121 escaped = re.escape(line).replace(r'\{', '{').replace(r'\}', '}') 122 # Replace a whitespace with \s+ 123 escaped = escaped.replace(r'\ ', r'\s+') 124 self.compiled_messages.append( 125 { 126 'error': error, 127 'tag': tag, 128 'match_on': match_on, 129 'line': re.compile(escaped.format(**values)), 130 'positions': sorted_position, 131 'values': values, 132 'replace': replace, 133 'model': model, 134 'mapping': mapping 135 } 136 ) 137 log.debug('Compiled messages:') 138 log.debug(self.compiled_messages) 139 140 def _parse(self, msg_dict): 141 ''' 142 Parse a syslog message and check what OpenConfig object should 143 be generated. 144 ''' 145 error_present = False 146 # log.debug('Matching the message:') 147 # log.debug(msg_dict) 148 for message in self.compiled_messages: 149 # log.debug('Matching using:') 150 # log.debug(message) 151 match_on = message['match_on'] 152 if match_on not in msg_dict: 153 # log.debug('%s is not a valid key in the partially parsed dict', match_on) 154 continue 155 if message['tag'] != msg_dict[match_on]: 156 continue 157 if '__python_fun__' in message: 158 return { 159 'model': message['model'], 160 'error': message['error'], 161 '__python_fun__': message['__python_fun__'] 162 } 163 error_present = True 164 match = message['line'].search(msg_dict['message']) 165 if not match: 166 continue 167 positions = message.get('positions', {}) 168 values = message.get('values') 169 ret = { 170 'model': message['model'], 171 'mapping': message['mapping'], 172 'replace': message['replace'], 173 'error': message['error'] 174 } 175 for key in values.keys(): 176 # Check if the value needs to be replaced 177 if key in message['replace']: 178 result = napalm_logs.utils.cast(match.group(positions.get(key)), message['replace'][key]) 179 else: 180 result = match.group(positions.get(key)) 181 ret[key] = result 182 return ret 183 if error_present is True: 184 log.info('Configured regex did not match for os: %s tag %s', self._name, msg_dict.get('tag', '')) 185 else: 186 log.info('Syslog message not configured for os: %s tag %s', self._name, msg_dict.get('tag', '')) 187 188 def _emit(self, **kwargs): 189 ''' 190 Emit an OpenConfig object given a certain combination of 191 fields mappeed in the config to the corresponding hierarchy. 192 ''' 193 oc_dict = {} 194 for mapping, result_key in kwargs['mapping']['variables'].items(): 195 result = kwargs[result_key] 196 oc_dict = napalm_logs.utils.setval(mapping.format(**kwargs), result, oc_dict) 197 for mapping, result in kwargs['mapping']['static'].items(): 198 oc_dict = napalm_logs.utils.setval(mapping.format(**kwargs), result, oc_dict) 199 200 return oc_dict 201 202 def _publish(self, obj): 203 ''' 204 Publish the OC object. 205 ''' 206 bin_obj = umsgpack.packb(obj) 207 self.pub.send(bin_obj) 208 209 def _format_time(self, time, date, timezone, prefix_id): 210 # TODO can we work out the time format from the regex? Probably but this is a task for another day 211 time_format = self._config['prefixes'][prefix_id].get('time_format', '') 212 if not time or not date or not time_format: 213 return int(datetime.now().strftime('%s')) 214 # Most syslog do not include the year, so we will add the current year if we are not supplied with one 215 if '%y' in time_format or '%Y' in time_format: 216 parsed_time = datetime.strptime('{} {}'.format(date, time), time_format) 217 else: 218 year = datetime.now().year 219 try: 220 parsed_time = datetime.strptime('{} {} {}'.format(year, date, time), '%Y {}'.format(time_format)) 221 # If the timestamp is in the future then it is likely that the year 222 # is wrong. We subtract 1 day from the parsed time to eleminate any 223 # difference between clocks. 224 if parsed_time - timedelta(days=1) > datetime.now(): 225 parsed_time = datetime.strptime( 226 '{} {} {}'.format(year - 1, date, time), 227 '%Y {}'.format(time_format) 228 ) 229 except ValueError: 230 # It is rare but by appending the year from the server, we could produce 231 # an invalid date such as February 29, 2018 (2018 is not a leap year). This 232 # is caused by the device emitting the syslog having an incorrect local date set. 233 # In such cases, we fall back to the full date from the server and log this action. 234 parsed_time = datetime.now().strftime(time_format) 235 log.info( 236 "Invalid date produced while formatting syslog date. Falling back to server date [%s]", 237 self._name 238 ) 239 return int((parsed_time - datetime(1970, 1, 1)).total_seconds()) 240 241 def start(self): 242 ''' 243 Start the worker process. 244 ''' 245 # metrics 246 napalm_logs_device_messages_received = Counter( 247 'napalm_logs_device_messages_received', 248 "Count of messages received by the device process", 249 ['device_os'] 250 ) 251 napalm_logs_device_raw_published_messages = Counter( 252 'napalm_logs_device_raw_published_messages', 253 "Count of raw type published messages", 254 ['device_os'] 255 ) 256 napalm_logs_device_published_messages = Counter( 257 'napalm_logs_device_published_messages', 258 "Count of published messages", 259 ['device_os'] 260 ) 261 napalm_logs_device_oc_object_failed = Counter( 262 'napalm_logs_device_oc_object_failed', 263 "Counter of failed OpenConfig object generations", 264 ['device_os'] 265 ) 266 267 self._setup_ipc() 268 # Start suicide polling thread 269 thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),)) 270 thread.start() 271 signal.signal(signal.SIGTERM, self._exit_gracefully) 272 self.__up = True 273 while self.__up: 274 # bin_obj = self.sub.recv() 275 # msg_dict, address = umsgpack.unpackb(bin_obj, use_list=False) 276 try: 277 bin_obj = self.sub.recv() 278 msg_dict, address = umsgpack.unpackb(bin_obj, use_list=False) 279 except zmq.ZMQError as error: 280 if self.__up is False: 281 log.info('Exiting on process shutdown [%s]', self._name) 282 return 283 else: 284 raise NapalmLogsExit(error) 285 log.debug('%s: dequeued %s, received from %s', self._name, msg_dict, address) 286 napalm_logs_device_messages_received.labels(device_os=self._name).inc() 287 host = msg_dict.get('host') 288 prefix_id = msg_dict.pop('__prefix_id__') 289 if 'timestamp' in msg_dict: 290 timestamp = msg_dict.pop('timestamp') 291 else: 292 timestamp = self._format_time(msg_dict.get('time', ''), 293 msg_dict.get('date', ''), 294 msg_dict.get('timeZone', 'UTC'), 295 prefix_id) 296 facility = msg_dict.get('facility') 297 severity = msg_dict.get('severity') 298 299 kwargs = self._parse(msg_dict) 300 if not kwargs: 301 # Unable to identify what model to generate for the message in cause. 302 # But publish the message when the user requested to push raw messages. 303 to_publish = { 304 'ip': address, 305 'host': host, 306 'timestamp': timestamp, 307 'message_details': msg_dict, 308 'os': self._name, 309 'error': 'RAW', 310 'model_name': 'raw', 311 'facility': facility, 312 'severity': severity 313 } 314 log.debug('Queueing to be published:') 315 log.debug(to_publish) 316 # self.pub_pipe.send(to_publish) 317 self.pub.send(umsgpack.packb(to_publish)) 318 napalm_logs_device_raw_published_messages.labels(device_os=self._name).inc() 319 continue 320 try: 321 if '__python_fun__' in kwargs: 322 log.debug('Using the Python parser to determine the YANG-equivalent object') 323 yang_obj = kwargs['__python_fun__'](msg_dict) 324 else: 325 yang_obj = self._emit(**kwargs) 326 except Exception: 327 log.exception('Unexpected error when generating the OC object.', exc_info=True) 328 napalm_logs_device_oc_object_failed.labels(device_os=self._name).inc() 329 continue 330 log.debug('Generated OC object:') 331 log.debug(yang_obj) 332 error = kwargs.get('error') 333 model_name = kwargs.get('model') 334 to_publish = { 335 'error': error, 336 'host': host, 337 'ip': address, 338 'timestamp': timestamp, 339 'yang_message': yang_obj, 340 'message_details': msg_dict, 341 'yang_model': model_name, 342 'os': self._name, 343 'facility': facility, 344 'severity': severity 345 } 346 log.debug('Queueing to be published:') 347 log.debug(to_publish) 348 # self.pub_pipe.send(to_publish) 349 self.pub.send(umsgpack.packb(to_publish)) 350 # self._publish(to_publish) 351 napalm_logs_device_published_messages.labels(device_os=self._name).inc() 352 353 def stop(self): 354 ''' 355 Stop the worker process. 356 ''' 357 log.info('Stopping %s device process', self._name) 358 self.__up = False 359 self.sub.close() 360 self.pub.close() 361 self.ctx.term() 362 # self.pipe.close() 363 # self.pub_pipe.close() 364