1from time import sleep
2import logging
3
4from jnpr.junos import exception as EzErrors
5from jnpr.junos.transport.tty_netconf import tty_netconf
6
7logger = logging.getLogger("jnpr.junos.tty")
8
9__all__ = ["Terminal"]
10
11# =========================================================================
12# Terminal class
13# =========================================================================
14
15
16class Terminal(object):
17
18    """
19    Terminal is used to bootstrap Junos New Out of the Box (NOOB) device
20    over the CONSOLE port. The general use-case is to setup the minimal
21    configuration so that the device is IP reachable using SSH
22    and NETCONF for remote management.
23
24    Serial is needed for Junos devices that do not support
25    the DHCP 'auto-installation' or 'ZTP' feature; i.e. you *MUST*
26    do the NOOB configuration via the CONSOLE.
27
28    Serial is also useful for situations even when the Junos
29    device supports auto-DHCP, but is not an option due to the
30    specific situation
31    """
32
33    TIMEOUT = 0.2  # serial readline timeout, seconds
34    EXPECT_TIMEOUT = 10  # total read timeout, seconds
35    LOGIN_RETRY = 20  # total number of passes thru login state-machine
36
37    _ST_INIT = 0
38    _ST_LOADER = 1
39    _ST_LOGIN = 2
40    _ST_PASSWD = 3
41    _ST_DONE = 4
42    _ST_BAD_PASSWD = 5
43    _ST_TTY_NOLOGIN = 6
44    _ST_TTY_OPTION = 7
45    _ST_TTY_HOTKEY = 8
46
47    _re_pat_login = "(?P<login>ogin:\s*$)"
48
49    _RE_PAT = [
50        "(?P<loader>oader>\s*$)",
51        _re_pat_login,
52        "(?P<passwd>assword:\s*$)",
53        "(?P<badpasswd>ogin incorrect)",
54        "(?P<netconf_closed><!-- session end at .*-->\s*)",
55        "(?P<shell>%|#|(~\$)\s*$)",
56        '(?P<cli>[^\\-"]>\s*$)',
57        "(?P<option>Enter your option:\s*$)",
58        "(?P<hotkey>connection: <CTRL>Z)",
59    ]
60
61    # -----------------------------------------------------------------------
62    # CONSTRUCTOR
63    # -----------------------------------------------------------------------
64
65    def __init__(self, **kvargs):
66        """
67        :kvargs['user']:
68          defaults to 'root'
69
70        :kvargs['passwd']:
71          defaults to empty; NOOB Junos device there is
72          no root password initially
73
74        :kvargs['attempts']:
75          the total number of login attempts thru the login
76          state-machine
77        """
78        # logic args
79        self.hostname = self.__dict__.get("host")
80        self.user = kvargs.get("user", "root")
81        self.passwd = kvargs.get("passwd", "")
82        self.cs_user = kvargs.get("cs_user")
83        self.cs_passwd = kvargs.get("cs_passwd")
84        self.login_attempts = kvargs.get("attempts") or self.LOGIN_RETRY
85        self.console_has_banner = kvargs.get("console_has_banner") or False
86        self._huge_tree = kvargs.get("huge_tree", False)
87
88        # misc setup
89        self.nc = tty_netconf(self)
90        self.state = self._ST_INIT
91        self._badpasswd = 0
92        self._loader = 0
93
94    @property
95    def tty_name(self):
96        return self._tty_name
97
98    # -----------------------------------------------------------------------
99    # Login/logout
100    # -----------------------------------------------------------------------
101
102    def login(self):
103        """
104        open the TTY connection and login.  once the login is successful,
105        start the NETCONF XML API process
106        """
107        logger.info("TTY: connecting to TTY:{} ...".format(self.tty_name))
108        self._tty_open()
109
110        logger.info("TTY: logging in......")
111
112        self.state = self._ST_INIT
113        self._login_state_machine()
114
115        # now start NETCONF XML
116        logger.info("TTY: OK.....starting NETCONF")
117        self.nc.open(at_shell=self.at_shell)
118        return True
119
120    def logout(self):
121        """
122        cleanly logout of the TTY
123        """
124        logger.info("logout: logging out.....")
125        self.nc.close()
126        self._logout_state_machine()
127        return True
128
129        # ---------------------------------------------------------------------
130        # TTY logout state-machine
131        # ---------------------------------------------------------------------
132
133    def _logout_state_machine(self, attempt=0):
134        if 10 == attempt:
135            raise RuntimeError("logout_sm_failure")
136
137        prompt, found = self.read_prompt()
138
139        def _ev_login():
140            # back at login prompt, so we are cleanly done!
141            self._tty_close()
142
143        def _ev_shell():
144            self.write("exit")
145
146        def _ev_cli():
147            self.write("exit")
148
149        # Connection closed by foreign host
150        def _ev_netconf_closed():
151            return True
152
153        _ev_tbl = {
154            "login": _ev_login,
155            "shell": _ev_shell,
156            "cli": _ev_cli,
157            "netconf_closed": _ev_netconf_closed,
158        }
159
160        # hack for now
161        # in case of telnet to management port, after writing exit on console
162        # it exits completely and returns None
163        ###
164        if found is not None:
165            _ev_tbl[found]()
166        else:
167            return True
168
169        if found == "login":
170            return True
171
172        else:
173            sleep(1)
174            return self._logout_state_machine(attempt=attempt + 1)
175
176    # -----------------------------------------------------------------------
177    # TTY login state-machine
178    # -----------------------------------------------------------------------
179    def _login_state_machine(self, attempt=0):
180        if self.login_attempts == attempt:
181            raise RuntimeError("login_sm_failure")
182
183        prompt, found = self.read_prompt()
184
185        def _ev_loader():
186            self.state = self._ST_LOADER
187            self.write("boot")
188            self.write("\n")
189            sleep(300)
190            self._login_state_machine(attempt=0)
191            self._loader += 1
192            if self._loader == 2:
193                raise RuntimeError("probably corrupted image, stuck in loader")
194
195        def _ev_login():
196            self.state = self._ST_LOGIN
197            self.write(self.user)
198
199        def _ev_passwd():
200            self.state = self._ST_PASSWD
201            self.write(self.passwd)
202
203        def _ev_bad_passwd():
204            self.state = self._ST_BAD_PASSWD
205            self.write("\n")
206            self._badpasswd += 1
207            if self._badpasswd == 2:
208                # raise RuntimeError("Bad username/password")
209                raise EzErrors.ConnectAuthError(self, "Bad username/password")
210            # return through and try again ... could have been
211            # prior failed attempt
212
213        def _ev_tty_nologin():
214            if self._ST_INIT == self.state:
215                # assume we're in a hung state, i.e. we don't see
216                # a login prompt for whatever reason
217                self.state = self._ST_TTY_NOLOGIN
218                if self.console_has_banner:
219                    # if console connection has a banner or warning,
220                    # use this hack
221                    sleep(5)
222                    self.write("\n")
223                else:
224                    # @@@ this is still a hack - used by default
225                    self.write("<close-session/>")
226
227        def _ev_shell():
228            if self.state == self._ST_INIT:
229                # this means that the shell was left
230                # open.  probably not a good thing,
231                # so issue a logging message, but move on.
232                logger.warning("login_warn: Shell login was open!!")
233
234            self.at_shell = True
235            self.state = self._ST_DONE
236            # if we are here, then we are done
237
238        def _ev_cli():
239            if self.state == self._ST_INIT:
240                # this means that the shell was left open.  probably not a
241                # good thing, so issue a logging message, hit <ENTER> and try
242                # again just to be sure...
243                logger.warning("login_warn: waiting on TTY..... ")
244                sleep(5)
245                #  return
246
247            self.at_shell = False
248            self.state = self._ST_DONE
249
250        def _ev_option():
251            self.state = self._ST_TTY_OPTION
252            self.write("1")
253
254        def _ev_hot_key():
255            self.state = self._ST_TTY_HOTKEY
256            self.write("\n")
257
258        _ev_tbl = {
259            "loader": _ev_loader,
260            "login": _ev_login,
261            "passwd": _ev_passwd,
262            "badpasswd": _ev_bad_passwd,
263            "shell": _ev_shell,
264            "cli": _ev_cli,
265            "option": _ev_option,
266            "hotkey": _ev_hot_key,
267        }
268
269        _ev_tbl.get(found, _ev_tty_nologin)()
270
271        if self.state == self._ST_DONE:
272            return True
273        else:
274            # if we are here, then loop the event again
275            self._login_state_machine(attempt + 1)
276