1# Copyright (c) 2017 RIPE NCC 2# 3# This program is free software: you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation, either version 3 of the License, or 6# (at your option) any later version. 7# 8# This program is distributed in the hope that it will be useful, 9# but WITHOUT ANY WARRANTY; without even the implied warranty of 10# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 11# GNU General Public License for more details. 12# 13# You should have received a copy of the GNU General Public License 14# along with this program. If not, see <http://www.gnu.org/licenses/>. 15 16from tzlocal import get_localzone 17 18from ..helpers.colours import colourise 19from ..helpers.sanitisers import sanitise 20from .base import Renderer as BaseRenderer 21 22 23class Renderer(BaseRenderer): 24 25 RENDERS = [BaseRenderer.TYPE_DNS] 26 TIME_FORMAT = "%Y-%m-%d %H:%M:%S" 27 ANSWER_COLORS = ['cyan', 'blue'] 28 29 def on_result(self, result): 30 31 created = result.created.astimezone(get_localzone()) 32 probe_id = result.probe_id 33 34 r = [] 35 if result.responses: 36 for response in result.responses: 37 r.append(self.get_formatted_response(probe_id, 38 created, response)) 39 else: 40 r.append("{}{}\n".format(self.get_header(probe_id, created), 41 colourise("No response found", "red"), 42 )) 43 44 return "".join(r) 45 46 @classmethod 47 def get_header(cls, probe_id, created): 48 return "Probe {0:>6}: {1} ".format("#{}".format(probe_id), 49 created.strftime(cls.TIME_FORMAT), 50 ) 51 52 @classmethod 53 def get_formatted_response(cls, probe_id, created, response): 54 55 s = [] 56 answers = "" 57 if response.abuf: 58 header_flags = [] 59 for flag in ("aa", "ad", "cd", "qr", "ra", "rd",): 60 if getattr(response.abuf.header, flag): 61 header_flags.append(flag) 62 s.append(response.abuf.header.return_code) 63 s.append(" ".join(header_flags)) 64 answers = cls.print_answers(response.abuf.answers) 65 else: 66 s.append("No abuf found") 67 68 if response.is_error or not response.abuf: 69 color = "red" 70 elif len(response.abuf.answers) == 0: 71 color = "yellow" 72 else: 73 color = "green" 74 75 status = colourise(" ".join(s), color) 76 return "". join([cls.get_header(probe_id, created), 77 status, 78 " " if answers else "", 79 answers, 80 "\n" 81 ]) 82 83 @staticmethod 84 def get_rrdata(data): 85 """ 86 Return RRdata in condensed text form. 87 """ 88 if not data: 89 return "" 90 # It's too complicated to override __str__ method of all Answer 91 # classes of Sagan so let's compress it text-wise instead 92 r = str(data).split() 93 r.pop(2) # get rid of the class 94 return sanitise(" ".join(r)) 95 96 @classmethod 97 def print_answers(cls, data): 98 """ 99 Return list of colourised condensed textual RRdata. 100 """ 101 r = [] 102 for record, color in zip(data, cls.ANSWER_COLORS*len(data)): 103 r.append(colourise(cls.get_rrdata(record), color)) 104 return "; ".join(r) 105