1# -*- coding: utf-8 -*-
2'''
3Device worker process
4'''
5from __future__ import absolute_import
6
7# Import python stdlib
8import os
9import re
10import signal
11import logging
12import threading
13from datetime import datetime, timedelta
14
15# Import thrid party libs
16import zmq
17import umsgpack
18from prometheus_client import Counter
19
20# Import napalm-logs pkgs
21import napalm_logs.utils
22import napalm_logs.ext.six as six
23from napalm_logs.proc import NapalmLogsProc
24from napalm_logs.config import PUB_PX_IPC_URL
25from napalm_logs.config import DEV_IPC_URL
26# exceptions
27from napalm_logs.exceptions import NapalmLogsExit
28
29log = logging.getLogger(__name__)
30
31
32class NapalmLogsDeviceProc(NapalmLogsProc):
33    '''
34    Device sub-process class.
35    '''
36    def __init__(self,
37                 name,
38                 opts,
39                 config):
40        self._name = name
41        log.debug('Starting process for %s', self._name)
42        self._config = config
43        self.opts = opts
44        self.__up = False
45        self.compiled_messages = None
46        self._compile_messages()
47
48    def _exit_gracefully(self, signum, _):
49        log.debug('Caught signal in %s device process', self._name)
50        self.stop()
51
52    def _setup_ipc(self):
53        '''
54        Subscribe to the right topic
55        in the device IPC and publish to the
56        publisher proxy.
57        '''
58        self.ctx = zmq.Context()
59        # subscribe to device IPC
60        log.debug('Creating the dealer IPC for %s', self._name)
61        self.sub = self.ctx.socket(zmq.DEALER)
62        if six.PY2:
63            self.sub.setsockopt(zmq.IDENTITY, self._name)
64        elif six.PY3:
65            self.sub.setsockopt(zmq.IDENTITY, bytes(self._name, 'utf-8'))
66        try:
67            self.sub.setsockopt(zmq.HWM, self.opts['hwm'])
68            # zmq 2
69        except AttributeError:
70            # zmq 3
71            self.sub.setsockopt(zmq.RCVHWM, self.opts['hwm'])
72        # subscribe to the corresponding IPC pipe
73        self.sub.connect(DEV_IPC_URL)
74        # publish to the publisher IPC
75        self.pub = self.ctx.socket(zmq.PUB)
76        self.pub.connect(PUB_PX_IPC_URL)
77        try:
78            self.pub.setsockopt(zmq.HWM, self.opts['hwm'])
79            # zmq 2
80        except AttributeError:
81            # zmq 3
82            self.pub.setsockopt(zmq.SNDHWM, self.opts['hwm'])
83
84    def _compile_messages(self):
85        '''
86        Create a list of all OS messages and their compiled regexs
87        '''
88        self.compiled_messages = []
89        if not self._config:
90            return
91        for message_dict in self._config.get('messages', {}):
92            error = message_dict['error']
93            tag = message_dict['tag']
94            model = message_dict['model']
95            match_on = message_dict.get('match_on', 'tag')
96            if '__python_fun__' in message_dict:
97                self.compiled_messages.append({
98                    'error': error,
99                    'tag': tag,
100                    'match_on': match_on,
101                    'model': model,
102                    '__python_fun__': message_dict['__python_fun__']
103                })
104                continue
105            values = message_dict['values']
106            line = message_dict['line']
107            mapping = message_dict['mapping']
108            # We will now figure out which position each value is in so we can use it with the match statement
109            position = {}
110            replace = {}
111            for key in values.keys():
112                if '|' in key:
113                    new_key, replace[new_key] = key.replace(' ', '').split('|')
114                    values[new_key] = values.pop(key)
115                    key = new_key
116                position[line.find('{' + key + '}')] = key
117            sorted_position = {}
118            for i, elem in enumerate(sorted(position.items())):
119                sorted_position[elem[1]] = i + 1
120            # Escape the line, then remove the escape for the curly bracets so they can be used when formatting
121            escaped = re.escape(line).replace(r'\{', '{').replace(r'\}', '}')
122            # Replace a whitespace with \s+
123            escaped = escaped.replace(r'\ ', r'\s+')
124            self.compiled_messages.append(
125                {
126                    'error': error,
127                    'tag': tag,
128                    'match_on': match_on,
129                    'line': re.compile(escaped.format(**values)),
130                    'positions': sorted_position,
131                    'values': values,
132                    'replace': replace,
133                    'model': model,
134                    'mapping': mapping
135                }
136            )
137        log.debug('Compiled messages:')
138        log.debug(self.compiled_messages)
139
140    def _parse(self, msg_dict):
141        '''
142        Parse a syslog message and check what OpenConfig object should
143        be generated.
144        '''
145        error_present = False
146        # log.debug('Matching the message:')
147        # log.debug(msg_dict)
148        for message in self.compiled_messages:
149            # log.debug('Matching using:')
150            # log.debug(message)
151            match_on = message['match_on']
152            if match_on not in msg_dict:
153                # log.debug('%s is not a valid key in the partially parsed dict', match_on)
154                continue
155            if message['tag'] != msg_dict[match_on]:
156                continue
157            if '__python_fun__' in message:
158                return {
159                    'model': message['model'],
160                    'error': message['error'],
161                    '__python_fun__': message['__python_fun__']
162                }
163            error_present = True
164            match = message['line'].search(msg_dict['message'])
165            if not match:
166                continue
167            positions = message.get('positions', {})
168            values = message.get('values')
169            ret = {
170                'model': message['model'],
171                'mapping': message['mapping'],
172                'replace': message['replace'],
173                'error': message['error']
174            }
175            for key in values.keys():
176                # Check if the value needs to be replaced
177                if key in message['replace']:
178                    result = napalm_logs.utils.cast(match.group(positions.get(key)), message['replace'][key])
179                else:
180                    result = match.group(positions.get(key))
181                ret[key] = result
182            return ret
183        if error_present is True:
184            log.info('Configured regex did not match for os: %s tag %s', self._name, msg_dict.get('tag', ''))
185        else:
186            log.info('Syslog message not configured for os: %s tag %s', self._name, msg_dict.get('tag', ''))
187
188    def _emit(self, **kwargs):
189        '''
190        Emit an OpenConfig object given a certain combination of
191        fields mappeed in the config to the corresponding hierarchy.
192        '''
193        oc_dict = {}
194        for mapping, result_key in kwargs['mapping']['variables'].items():
195            result = kwargs[result_key]
196            oc_dict = napalm_logs.utils.setval(mapping.format(**kwargs), result, oc_dict)
197        for mapping, result in kwargs['mapping']['static'].items():
198            oc_dict = napalm_logs.utils.setval(mapping.format(**kwargs), result, oc_dict)
199
200        return oc_dict
201
202    def _publish(self, obj):
203        '''
204        Publish the OC object.
205        '''
206        bin_obj = umsgpack.packb(obj)
207        self.pub.send(bin_obj)
208
209    def _format_time(self, time, date, timezone, prefix_id):
210        # TODO can we work out the time format from the regex? Probably but this is a task for another day
211        time_format = self._config['prefixes'][prefix_id].get('time_format', '')
212        if not time or not date or not time_format:
213            return int(datetime.now().strftime('%s'))
214        # Most syslog do not include the year, so we will add the current year if we are not supplied with one
215        if '%y' in time_format or '%Y' in time_format:
216            parsed_time = datetime.strptime('{} {}'.format(date, time), time_format)
217        else:
218            year = datetime.now().year
219            try:
220                parsed_time = datetime.strptime('{} {} {}'.format(year, date, time), '%Y {}'.format(time_format))
221                # If the timestamp is in the future then it is likely that the year
222                # is wrong. We subtract 1 day from the parsed time to eleminate any
223                # difference between clocks.
224                if parsed_time - timedelta(days=1) > datetime.now():
225                    parsed_time = datetime.strptime(
226                        '{} {} {}'.format(year - 1, date, time),
227                        '%Y {}'.format(time_format)
228                    )
229            except ValueError:
230                # It is rare but by appending the year from the server, we could produce
231                # an invalid date such as February 29, 2018 (2018 is not a leap year). This
232                # is caused by the device emitting the syslog having an incorrect local date set.
233                # In such cases, we fall back to the full date from the server and log this action.
234                parsed_time = datetime.now().strftime(time_format)
235                log.info(
236                    "Invalid date produced while formatting syslog date. Falling back to server date [%s]",
237                    self._name
238                )
239        return int((parsed_time - datetime(1970, 1, 1)).total_seconds())
240
241    def start(self):
242        '''
243        Start the worker process.
244        '''
245        # metrics
246        napalm_logs_device_messages_received = Counter(
247            'napalm_logs_device_messages_received',
248            "Count of messages received by the device process",
249            ['device_os']
250        )
251        napalm_logs_device_raw_published_messages = Counter(
252            'napalm_logs_device_raw_published_messages',
253            "Count of raw type published messages",
254            ['device_os']
255        )
256        napalm_logs_device_published_messages = Counter(
257            'napalm_logs_device_published_messages',
258            "Count of published messages",
259            ['device_os']
260        )
261        napalm_logs_device_oc_object_failed = Counter(
262            'napalm_logs_device_oc_object_failed',
263            "Counter of failed OpenConfig object generations",
264            ['device_os']
265        )
266
267        self._setup_ipc()
268        # Start suicide polling thread
269        thread = threading.Thread(target=self._suicide_when_without_parent, args=(os.getppid(),))
270        thread.start()
271        signal.signal(signal.SIGTERM, self._exit_gracefully)
272        self.__up = True
273        while self.__up:
274            # bin_obj = self.sub.recv()
275            # msg_dict, address = umsgpack.unpackb(bin_obj, use_list=False)
276            try:
277                bin_obj = self.sub.recv()
278                msg_dict, address = umsgpack.unpackb(bin_obj, use_list=False)
279            except zmq.ZMQError as error:
280                if self.__up is False:
281                    log.info('Exiting on process shutdown [%s]', self._name)
282                    return
283                else:
284                    raise NapalmLogsExit(error)
285            log.debug('%s: dequeued %s, received from %s', self._name, msg_dict, address)
286            napalm_logs_device_messages_received.labels(device_os=self._name).inc()
287            host = msg_dict.get('host')
288            prefix_id = msg_dict.pop('__prefix_id__')
289            if 'timestamp' in msg_dict:
290                timestamp = msg_dict.pop('timestamp')
291            else:
292                timestamp = self._format_time(msg_dict.get('time', ''),
293                                              msg_dict.get('date', ''),
294                                              msg_dict.get('timeZone', 'UTC'),
295                                              prefix_id)
296            facility = msg_dict.get('facility')
297            severity = msg_dict.get('severity')
298
299            kwargs = self._parse(msg_dict)
300            if not kwargs:
301                # Unable to identify what model to generate for the message in cause.
302                # But publish the message when the user requested to push raw messages.
303                to_publish = {
304                    'ip': address,
305                    'host': host,
306                    'timestamp': timestamp,
307                    'message_details': msg_dict,
308                    'os': self._name,
309                    'error': 'RAW',
310                    'model_name': 'raw',
311                    'facility': facility,
312                    'severity': severity
313                }
314                log.debug('Queueing to be published:')
315                log.debug(to_publish)
316                # self.pub_pipe.send(to_publish)
317                self.pub.send(umsgpack.packb(to_publish))
318                napalm_logs_device_raw_published_messages.labels(device_os=self._name).inc()
319                continue
320            try:
321                if '__python_fun__' in kwargs:
322                    log.debug('Using the Python parser to determine the YANG-equivalent object')
323                    yang_obj = kwargs['__python_fun__'](msg_dict)
324                else:
325                    yang_obj = self._emit(**kwargs)
326            except Exception:
327                log.exception('Unexpected error when generating the OC object.', exc_info=True)
328                napalm_logs_device_oc_object_failed.labels(device_os=self._name).inc()
329                continue
330            log.debug('Generated OC object:')
331            log.debug(yang_obj)
332            error = kwargs.get('error')
333            model_name = kwargs.get('model')
334            to_publish = {
335                'error': error,
336                'host': host,
337                'ip': address,
338                'timestamp': timestamp,
339                'yang_message': yang_obj,
340                'message_details': msg_dict,
341                'yang_model': model_name,
342                'os': self._name,
343                'facility': facility,
344                'severity': severity
345            }
346            log.debug('Queueing to be published:')
347            log.debug(to_publish)
348            # self.pub_pipe.send(to_publish)
349            self.pub.send(umsgpack.packb(to_publish))
350            # self._publish(to_publish)
351            napalm_logs_device_published_messages.labels(device_os=self._name).inc()
352
353    def stop(self):
354        '''
355        Stop the worker process.
356        '''
357        log.info('Stopping %s device process', self._name)
358        self.__up = False
359        self.sub.close()
360        self.pub.close()
361        self.ctx.term()
362        # self.pipe.close()
363        # self.pub_pipe.close()
364