1from time import sleep 2import logging 3 4from jnpr.junos import exception as EzErrors 5from jnpr.junos.transport.tty_netconf import tty_netconf 6 7logger = logging.getLogger("jnpr.junos.tty") 8 9__all__ = ["Terminal"] 10 11# ========================================================================= 12# Terminal class 13# ========================================================================= 14 15 16class Terminal(object): 17 18 """ 19 Terminal is used to bootstrap Junos New Out of the Box (NOOB) device 20 over the CONSOLE port. The general use-case is to setup the minimal 21 configuration so that the device is IP reachable using SSH 22 and NETCONF for remote management. 23 24 Serial is needed for Junos devices that do not support 25 the DHCP 'auto-installation' or 'ZTP' feature; i.e. you *MUST* 26 do the NOOB configuration via the CONSOLE. 27 28 Serial is also useful for situations even when the Junos 29 device supports auto-DHCP, but is not an option due to the 30 specific situation 31 """ 32 33 TIMEOUT = 0.2 # serial readline timeout, seconds 34 EXPECT_TIMEOUT = 10 # total read timeout, seconds 35 LOGIN_RETRY = 20 # total number of passes thru login state-machine 36 37 _ST_INIT = 0 38 _ST_LOADER = 1 39 _ST_LOGIN = 2 40 _ST_PASSWD = 3 41 _ST_DONE = 4 42 _ST_BAD_PASSWD = 5 43 _ST_TTY_NOLOGIN = 6 44 _ST_TTY_OPTION = 7 45 _ST_TTY_HOTKEY = 8 46 47 _re_pat_login = "(?P<login>ogin:\s*$)" 48 49 _RE_PAT = [ 50 "(?P<loader>oader>\s*$)", 51 _re_pat_login, 52 "(?P<passwd>assword:\s*$)", 53 "(?P<badpasswd>ogin incorrect)", 54 "(?P<netconf_closed><!-- session end at .*-->\s*)", 55 "(?P<shell>%|#|(~\$)\s*$)", 56 '(?P<cli>[^\\-"]>\s*$)', 57 "(?P<option>Enter your option:\s*$)", 58 "(?P<hotkey>connection: <CTRL>Z)", 59 ] 60 61 # ----------------------------------------------------------------------- 62 # CONSTRUCTOR 63 # ----------------------------------------------------------------------- 64 65 def __init__(self, **kvargs): 66 """ 67 :kvargs['user']: 68 defaults to 'root' 69 70 :kvargs['passwd']: 71 defaults to empty; NOOB Junos device there is 72 no root password initially 73 74 :kvargs['attempts']: 75 the total number of login attempts thru the login 76 state-machine 77 """ 78 # logic args 79 self.hostname = self.__dict__.get("host") 80 self.user = kvargs.get("user", "root") 81 self.passwd = kvargs.get("passwd", "") 82 self.cs_user = kvargs.get("cs_user") 83 self.cs_passwd = kvargs.get("cs_passwd") 84 self.login_attempts = kvargs.get("attempts") or self.LOGIN_RETRY 85 self.console_has_banner = kvargs.get("console_has_banner") or False 86 self._huge_tree = kvargs.get("huge_tree", False) 87 88 # misc setup 89 self.nc = tty_netconf(self) 90 self.state = self._ST_INIT 91 self._badpasswd = 0 92 self._loader = 0 93 94 @property 95 def tty_name(self): 96 return self._tty_name 97 98 # ----------------------------------------------------------------------- 99 # Login/logout 100 # ----------------------------------------------------------------------- 101 102 def login(self): 103 """ 104 open the TTY connection and login. once the login is successful, 105 start the NETCONF XML API process 106 """ 107 logger.info("TTY: connecting to TTY:{} ...".format(self.tty_name)) 108 self._tty_open() 109 110 logger.info("TTY: logging in......") 111 112 self.state = self._ST_INIT 113 self._login_state_machine() 114 115 # now start NETCONF XML 116 logger.info("TTY: OK.....starting NETCONF") 117 self.nc.open(at_shell=self.at_shell) 118 return True 119 120 def logout(self): 121 """ 122 cleanly logout of the TTY 123 """ 124 logger.info("logout: logging out.....") 125 self.nc.close() 126 self._logout_state_machine() 127 return True 128 129 # --------------------------------------------------------------------- 130 # TTY logout state-machine 131 # --------------------------------------------------------------------- 132 133 def _logout_state_machine(self, attempt=0): 134 if 10 == attempt: 135 raise RuntimeError("logout_sm_failure") 136 137 prompt, found = self.read_prompt() 138 139 def _ev_login(): 140 # back at login prompt, so we are cleanly done! 141 self._tty_close() 142 143 def _ev_shell(): 144 self.write("exit") 145 146 def _ev_cli(): 147 self.write("exit") 148 149 # Connection closed by foreign host 150 def _ev_netconf_closed(): 151 return True 152 153 _ev_tbl = { 154 "login": _ev_login, 155 "shell": _ev_shell, 156 "cli": _ev_cli, 157 "netconf_closed": _ev_netconf_closed, 158 } 159 160 # hack for now 161 # in case of telnet to management port, after writing exit on console 162 # it exits completely and returns None 163 ### 164 if found is not None: 165 _ev_tbl[found]() 166 else: 167 return True 168 169 if found == "login": 170 return True 171 172 else: 173 sleep(1) 174 return self._logout_state_machine(attempt=attempt + 1) 175 176 # ----------------------------------------------------------------------- 177 # TTY login state-machine 178 # ----------------------------------------------------------------------- 179 def _login_state_machine(self, attempt=0): 180 if self.login_attempts == attempt: 181 raise RuntimeError("login_sm_failure") 182 183 prompt, found = self.read_prompt() 184 185 def _ev_loader(): 186 self.state = self._ST_LOADER 187 self.write("boot") 188 self.write("\n") 189 sleep(300) 190 self._login_state_machine(attempt=0) 191 self._loader += 1 192 if self._loader == 2: 193 raise RuntimeError("probably corrupted image, stuck in loader") 194 195 def _ev_login(): 196 self.state = self._ST_LOGIN 197 self.write(self.user) 198 199 def _ev_passwd(): 200 self.state = self._ST_PASSWD 201 self.write(self.passwd) 202 203 def _ev_bad_passwd(): 204 self.state = self._ST_BAD_PASSWD 205 self.write("\n") 206 self._badpasswd += 1 207 if self._badpasswd == 2: 208 # raise RuntimeError("Bad username/password") 209 raise EzErrors.ConnectAuthError(self, "Bad username/password") 210 # return through and try again ... could have been 211 # prior failed attempt 212 213 def _ev_tty_nologin(): 214 if self._ST_INIT == self.state: 215 # assume we're in a hung state, i.e. we don't see 216 # a login prompt for whatever reason 217 self.state = self._ST_TTY_NOLOGIN 218 if self.console_has_banner: 219 # if console connection has a banner or warning, 220 # use this hack 221 sleep(5) 222 self.write("\n") 223 else: 224 # @@@ this is still a hack - used by default 225 self.write("<close-session/>") 226 227 def _ev_shell(): 228 if self.state == self._ST_INIT: 229 # this means that the shell was left 230 # open. probably not a good thing, 231 # so issue a logging message, but move on. 232 logger.warning("login_warn: Shell login was open!!") 233 234 self.at_shell = True 235 self.state = self._ST_DONE 236 # if we are here, then we are done 237 238 def _ev_cli(): 239 if self.state == self._ST_INIT: 240 # this means that the shell was left open. probably not a 241 # good thing, so issue a logging message, hit <ENTER> and try 242 # again just to be sure... 243 logger.warning("login_warn: waiting on TTY..... ") 244 sleep(5) 245 # return 246 247 self.at_shell = False 248 self.state = self._ST_DONE 249 250 def _ev_option(): 251 self.state = self._ST_TTY_OPTION 252 self.write("1") 253 254 def _ev_hot_key(): 255 self.state = self._ST_TTY_HOTKEY 256 self.write("\n") 257 258 _ev_tbl = { 259 "loader": _ev_loader, 260 "login": _ev_login, 261 "passwd": _ev_passwd, 262 "badpasswd": _ev_bad_passwd, 263 "shell": _ev_shell, 264 "cli": _ev_cli, 265 "option": _ev_option, 266 "hotkey": _ev_hot_key, 267 } 268 269 _ev_tbl.get(found, _ev_tty_nologin)() 270 271 if self.state == self._ST_DONE: 272 return True 273 else: 274 # if we are here, then loop the event again 275 self._login_state_machine(attempt + 1) 276