1"""Ciena SAOS support.""" 2import time 3import re 4import os 5from netmiko.base_connection import BaseConnection 6from netmiko.scp_handler import BaseFileTransfer 7 8 9class CienaSaosBase(BaseConnection): 10 """ 11 Ciena SAOS support. 12 13 Implements methods for interacting Ciena Saos devices. 14 15 Disables enable(), check_enable_mode(), config_mode() and 16 check_config_mode() 17 """ 18 19 def session_preparation(self): 20 self._test_channel_read() 21 self.set_base_prompt() 22 self.disable_paging(command="system shell session set more off") 23 # Clear the read buffer 24 time.sleep(0.3 * self.global_delay_factor) 25 self.clear_buffer() 26 27 def _enter_shell(self): 28 """Enter the Bourne Shell.""" 29 output = self.send_command("diag shell", expect_string=r"[$#>]") 30 if "SHELL PARSER FAILURE" in output: 31 msg = "SCP support on Ciena SAOS requires 'diag shell' permissions" 32 raise ValueError(msg) 33 return output 34 35 def _return_cli(self): 36 """Return to the Ciena SAOS CLI.""" 37 return self.send_command("exit", expect_string=r"[>]") 38 39 def check_enable_mode(self, *args, **kwargs): 40 """No enable mode on Ciena SAOS.""" 41 return True 42 43 def enable(self, *args, **kwargs): 44 """No enable mode on Ciena SAOS.""" 45 return "" 46 47 def exit_enable_mode(self, *args, **kwargs): 48 """No enable mode on Ciena SAOS.""" 49 return "" 50 51 def check_config_mode(self, check_string=">", pattern=""): 52 """No config mode on Ciena SAOS.""" 53 return False 54 55 def config_mode(self, config_command=""): 56 """No config mode on Ciena SAOS.""" 57 return "" 58 59 def exit_config_mode(self, exit_config=""): 60 """No config mode on Ciena SAOS.""" 61 return "" 62 63 def save_config(self, cmd="configuration save", confirm=False, confirm_response=""): 64 """Saves Config.""" 65 return self.send_command(command_string=cmd) 66 67 68class CienaSaosSSH(CienaSaosBase): 69 pass 70 71 72class CienaSaosTelnet(CienaSaosBase): 73 def __init__(self, *args, **kwargs): 74 default_enter = kwargs.get("default_enter") 75 kwargs["default_enter"] = "\r\n" if default_enter is None else default_enter 76 super().__init__(*args, **kwargs) 77 78 79class CienaSaosFileTransfer(BaseFileTransfer): 80 """Ciena SAOS SCP File Transfer driver.""" 81 82 def __init__( 83 self, 84 ssh_conn, 85 source_file, 86 dest_file, 87 file_system="", 88 direction="put", 89 **kwargs, 90 ): 91 if file_system == "": 92 file_system = f"/tmp/users/{ssh_conn.username}" 93 return super().__init__( 94 ssh_conn=ssh_conn, 95 source_file=source_file, 96 dest_file=dest_file, 97 file_system=file_system, 98 direction=direction, 99 **kwargs, 100 ) 101 102 def remote_space_available(self, search_pattern=""): 103 """ 104 Return space available on Ciena SAOS 105 106 Output should only have the file-system that matches {self.file_system} 107 108 Filesystem 1K-blocks Used Available Use% Mounted on 109 tmpfs 1048576 648 1047928 0% /tmp 110 """ 111 remote_cmd = f"file vols -P {self.file_system}" 112 remote_output = self.ssh_ctl_chan.send_command_expect(remote_cmd) 113 remote_output = remote_output.strip() 114 err_msg = ( 115 f"Parsing error, unexpected output from {remote_cmd}:\n{remote_output}" 116 ) 117 118 # First line is the header; file_system_line is the output we care about 119 header_line, filesystem_line = remote_output.splitlines() 120 121 filesystem, _, _, space_avail, *_ = header_line.split() 122 if "Filesystem" != filesystem or "Avail" not in space_avail: 123 # Filesystem 1K-blocks Used Available Use% Mounted on 124 raise ValueError(err_msg) 125 126 # Normalize output - in certain outputs ciena will line wrap (this fixes that) 127 # Strip the extra newline 128 # /dev/mapper/EN--VOL-config 129 # 4096 1476 2620 36% /etc/hosts 130 filesystem_line = re.sub(r"(^\S+$)\n", r"\1", filesystem_line, flags=re.M) 131 132 # Checks to make sure what was returned is what we expect 133 _, k_blocks, used, space_avail, _, _ = filesystem_line.split() 134 for integer_check in (k_blocks, used, space_avail): 135 try: 136 int(integer_check) 137 except ValueError: 138 raise ValueError(err_msg) 139 140 return int(space_avail) * 1024 141 142 def check_file_exists(self, remote_cmd=""): 143 """Check if the dest_file already exists on the file system (return boolean).""" 144 if self.direction == "put": 145 if not remote_cmd: 146 remote_cmd = f"file ls {self.file_system}/{self.dest_file}" 147 remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd) 148 search_string = re.escape(f"{self.file_system}/{self.dest_file}") 149 if "ERROR" in remote_out: 150 return False 151 elif re.search(search_string, remote_out): 152 return True 153 else: 154 raise ValueError("Unexpected output from check_file_exists") 155 elif self.direction == "get": 156 return os.path.exists(self.dest_file) 157 158 def remote_file_size(self, remote_cmd="", remote_file=None): 159 """Get the file size of the remote file.""" 160 if remote_file is None: 161 if self.direction == "put": 162 remote_file = self.dest_file 163 elif self.direction == "get": 164 remote_file = self.source_file 165 166 remote_file = f"{self.file_system}/{remote_file}" 167 168 if not remote_cmd: 169 remote_cmd = f"file ls -l {remote_file}" 170 171 remote_out = self.ssh_ctl_chan.send_command_expect(remote_cmd) 172 173 if "No such file or directory" in remote_out: 174 raise IOError("Unable to find file on remote system") 175 176 escape_file_name = re.escape(remote_file) 177 pattern = r"^.* ({}).*$".format(escape_file_name) 178 match = re.search(pattern, remote_out, flags=re.M) 179 if match: 180 # Format: -rw-r--r-- 1 pyclass wheel 12 Nov 5 19:07 /var/tmp/test3.txt 181 line = match.group(0) 182 file_size = line.split()[4] 183 return int(file_size) 184 185 raise ValueError( 186 "Search pattern not found for remote file size during SCP transfer." 187 ) 188 189 def remote_md5(self, base_cmd="", remote_file=None): 190 """Calculate remote MD5 and returns the hash. 191 192 This command can be CPU intensive on the remote device. 193 """ 194 if base_cmd == "": 195 base_cmd = "md5sum" 196 if remote_file is None: 197 if self.direction == "put": 198 remote_file = self.dest_file 199 elif self.direction == "get": 200 remote_file = self.source_file 201 202 remote_md5_cmd = f"{base_cmd} {self.file_system}/{remote_file}" 203 204 self.ssh_ctl_chan._enter_shell() 205 dest_md5 = self.ssh_ctl_chan.send_command( 206 remote_md5_cmd, expect_string=r"[$#>]" 207 ) 208 self.ssh_ctl_chan._return_cli() 209 dest_md5 = self.process_md5(dest_md5, pattern=r"([0-9a-f]+)\s+") 210 return dest_md5 211 212 def enable_scp(self, cmd="system server scp enable"): 213 return super().enable_scp(cmd=cmd) 214 215 def disable_scp(self, cmd="system server scp disable"): 216 return super().disable_scp(cmd=cmd) 217