1""" 2An engine that sends events to the Logentries logging service. 3 4:maintainer: Jimmy Tang (jimmy_tang@rapid7.com) 5:maturity: New 6:depends: ssl, certifi 7:platform: all 8 9.. versionadded:: 2016.3.0 10 11To enable this engine the master and/or minion will need the following 12python libraries 13 14 ssl 15 certifi 16 17If you are running a new enough version of python then the ssl library 18will be present already. 19 20You will also need the following values configured in the minion or 21master config. 22 23:configuration: 24 25 Example configuration 26 27 .. code-block:: yaml 28 29 engines: 30 - logentries: 31 endpoint: data.logentries.com 32 port: 10000 33 token: 057af3e2-1c05-47c5-882a-5cd644655dbf 34 35The 'token' can be obtained from the Logentries service. 36 37To test this engine 38 39 .. code-block:: bash 40 41 salt '*' test.ping cmd.run uptime 42 43""" 44 45import logging 46import random 47import socket 48import time 49import uuid 50 51import salt.utils.event 52import salt.utils.json 53 54try: 55 import certifi 56 57 HAS_CERTIFI = True 58except ImportError: 59 HAS_CERTIFI = False 60 61# This is here for older python installs, it is needed to setup an encrypted tcp connection 62try: 63 import ssl 64 65 HAS_SSL = True 66except ImportError: # for systems without TLS support. 67 HAS_SSL = False 68 69 70log = logging.getLogger(__name__) 71 72 73def __virtual__(): 74 return True if HAS_CERTIFI and HAS_SSL else False 75 76 77class PlainTextSocketAppender: 78 def __init__( 79 self, verbose=True, LE_API="data.logentries.com", LE_PORT=80, LE_TLS_PORT=443 80 ): 81 82 self.LE_API = LE_API 83 self.LE_PORT = LE_PORT 84 self.LE_TLS_PORT = LE_TLS_PORT 85 self.MIN_DELAY = 0.1 86 self.MAX_DELAY = 10 87 # Error message displayed when an incorrect Token has been detected 88 self.INVALID_TOKEN = ( 89 "\n\nIt appears the LOGENTRIES_TOKEN " 90 "parameter you entered is incorrect!\n\n" 91 ) 92 # Encoded unicode line separator 93 self.LINE_SEP = salt.utils.stringutils.to_str("\u2028") 94 95 self.verbose = verbose 96 self._conn = None 97 98 def open_connection(self): 99 self._conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 100 self._conn.connect((self.LE_API, self.LE_PORT)) 101 102 def reopen_connection(self): 103 self.close_connection() 104 105 root_delay = self.MIN_DELAY 106 while True: 107 try: 108 self.open_connection() 109 return 110 except Exception: # pylint: disable=broad-except 111 if self.verbose: 112 log.warning("Unable to connect to Logentries") 113 114 root_delay *= 2 115 if root_delay > self.MAX_DELAY: 116 root_delay = self.MAX_DELAY 117 118 wait_for = root_delay + random.uniform(0, root_delay) 119 120 try: 121 time.sleep(wait_for) 122 except KeyboardInterrupt: # pylint: disable=try-except-raise 123 raise 124 125 def close_connection(self): 126 if self._conn is not None: 127 self._conn.close() 128 129 def put(self, data): 130 # Replace newlines with Unicode line separator for multi-line events 131 multiline = data.replace("\n", self.LINE_SEP) + "\n" 132 # Send data, reconnect if needed 133 while True: 134 try: 135 self._conn.send(multiline) 136 except OSError: 137 self.reopen_connection() 138 continue 139 break 140 141 self.close_connection() 142 143 144try: 145 import ssl 146 147 HAS_SSL = True 148except ImportError: # for systems without TLS support. 149 SocketAppender = PlainTextSocketAppender 150 HAS_SSL = False 151else: 152 153 class TLSSocketAppender(PlainTextSocketAppender): 154 def open_connection(self): 155 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 156 sock = ssl.wrap_socket( 157 sock=sock, 158 keyfile=None, 159 certfile=None, 160 server_side=False, 161 cert_reqs=ssl.CERT_REQUIRED, 162 ssl_version=getattr(ssl, "PROTOCOL_TLSv1_2", ssl.PROTOCOL_TLSv1), 163 ca_certs=certifi.where(), 164 do_handshake_on_connect=True, 165 suppress_ragged_eofs=True, 166 ) 167 sock.connect((self.LE_API, self.LE_TLS_PORT)) 168 self._conn = sock 169 170 SocketAppender = TLSSocketAppender 171 172 173def event_bus_context(opts): 174 if opts.get("id").endswith("_master"): 175 event_bus = salt.utils.event.get_master_event( 176 opts, opts["sock_dir"], listen=True 177 ) 178 else: 179 event_bus = salt.utils.event.get_event( 180 "minion", 181 transport=opts["transport"], 182 opts=opts, 183 sock_dir=opts["sock_dir"], 184 listen=True, 185 ) 186 return event_bus 187 188 189def start( 190 endpoint="data.logentries.com", 191 port=10000, 192 token=None, 193 tag="salt/engines/logentries", 194): 195 """ 196 Listen to salt events and forward them to Logentries 197 """ 198 with event_bus_context(__opts__) as event_bus: 199 log.debug("Logentries engine started") 200 try: 201 val = uuid.UUID(token) 202 except ValueError: 203 log.warning("Not a valid logentries token") 204 205 appender = SocketAppender(verbose=False, LE_API=endpoint, LE_PORT=port) 206 appender.reopen_connection() 207 208 while True: 209 event = event_bus.get_event() 210 if event: 211 msg = " ".join( 212 ( 213 salt.utils.stringutils.to_str(token), 214 salt.utils.stringutils.to_str(tag), 215 salt.utils.json.dumps(event), 216 ) 217 ) 218 appender.put(msg) 219 220 appender.close_connection() 221