1# Copyright (c) 2012, 2021, Oracle and/or its affiliates. 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License, version 2.0, 5# as published by the Free Software Foundation. 6# 7# This program is also distributed with certain software (including 8# but not limited to OpenSSL) that is licensed under separate terms, 9# as designated in a particular file or component or in included license 10# documentation. The authors of MySQL hereby grant you an additional 11# permission to link the program and your derivative works with the 12# separately licensed software that they have included with MySQL. 13# 14# This program is distributed in the hope that it will be useful, 15# but WITHOUT ANY WARRANTY; without even the implied warranty of 16# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 17# GNU General Public License, version 2.0, for more details. 18# 19# You should have received a copy of the GNU General Public License 20# along with this program; if not, write to the Free Software 21# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA 22 23"""Provides specialization of ABClusterHost for remote hosts using Paramiko.""" 24 25import errno 26import stat 27import util 28import time 29import paramiko 30import ntpath 31import logging 32import os.path 33import tempfile 34import contextlib 35import posixpath 36 37import clusterhost 38from clusterhost import ABClusterHost 39 40_logger = logging.getLogger(__name__) 41 42def quote_if_contains_space(s): 43 if ' ' in s: 44 return '"'+s+'"' 45 return s 46 47class RemoteExecException(clusterhost.ExecException): 48 """Exception type thrown whenever os-command execution fails on 49 a remote host. """ 50 51 def __init__(self, hostname, cmd, exitstatus, out): 52 self.hostname = hostname 53 self.cmd = cmd 54 self.exitstatus = exitstatus 55 self.out = out.read() 56 def __str__(self): 57 return 'Command `{self.cmd}\', running on {self.hostname} exited with {self.exitstatus}:\n{self.out}'.format(self=self) 58 59class RemoteClusterHost(ABClusterHost): 60 """Implements the ABClusterHost interface for remote hosts. Wraps a paramiko.SSHClient and uses 61 this to perform tasks on the remote host.""" 62 63 def __init__(self, host, username=None, password=None): 64 super(type(self), self).__init__() 65 self.host = host 66 self.user = username 67 self.pwd = password 68 c = paramiko.SSHClient() 69 c.load_system_host_keys() 70 c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 71 c.connect(hostname=self.host, username=self.user, password=self.pwd) 72 self.__client = c 73 self.__sftp = c.open_sftp() 74 75 def close(self): 76 self.drop() 77 78 @property 79 def client(self): 80 return self.__client 81 82 @property 83 def sftp(self): 84 return self.__sftp 85 86# @property 87# def client(self): 88# """"A freshly connected SSHClient object.""" 89# if self.__client != None: 90# if self.__sftp != None: 91# self.__sftp.close() 92# self.__sftp = None 93# self.__client.close() 94 95# c = paramiko.SSHClient() 96# c.load_system_host_keys() 97 98# # TODO - we need user acceptance for this by button in the frontend 99# c.set_missing_host_key_policy(paramiko.AutoAddPolicy()) 100# ak = 'H:\\.ssh\\known_hosts' 101# if os.path.exists(ak): 102# _logger.debug('Loading additional host keys from %s', ak) 103# c.load_host_keys(filename=ak) 104# else: 105# _logger.debug('File %s does not exist here', ak) 106 107# c.connect(hostname=self.host, username=self.user, password=self.pwd) 108# self.__client = c 109# return c 110 111# @property 112# def sftp(self): 113# """"An SFTPClient object to this host. It and its SSHClient 114# object will be created on demand.""" 115# if self.__sftp != None: 116# self.__sftp.close() 117 118# self.__sftp = self.client.open_sftp() 119# return self.__sftp 120 121 def _get_system_tuple(self): 122 preamble = None 123 system = None 124 processor = None 125 try: 126 preamble = self.exec_blocking(['#']) 127 except: 128 _logger.debug('executing # failed - assuming Windows...') 129 (system, processor) = self.exec_blocking(['cmd.exe', '/c', 'echo', '%OS%', '%PROCESSOR_ARCHITECTURE%']).split(' ') 130 if 'Windows' in system: 131 system = 'Windows' 132 else: 133 _logger.debug('preamble='+preamble) 134 raw_uname = self.exec_blocking(['uname', '-sp']) 135 _logger.debug('raw_uname='+raw_uname) 136 uname = raw_uname.replace(preamble, '', 1) 137 _logger.debug('uname='+uname) 138 (system, processor) = uname.split(' ') 139 if 'CYGWIN' in system: 140 system = 'CYGWIN' 141 return (system, processor.strip()) 142 143 def _exec_pkg_cmdv(self, cmdv): 144 """For remote hosts the binary is fist copied over using sftp.""" 145 _logger.debug("%s", str(self.sftp.listdir())) 146 hi = os.path.basename(cmdv[0]) 147 self.sftp.put(cmdv[0], hi) 148 self.sftp.chmod(hi, stat.S_IRWXU) 149 return self.exec_cmdv([self.path_module.join('.', hi)] + cmdv[1:-1]) 150 151 def _sftpify(self, path): 152 """Since sftp treats all path names as relative to its root we must 153 convert absolute paths before using them with sftp. As quick-fix we 154 assume that the sftp root is equal to the drive letter of all absolute 155 paths used. If it isn't the sftp operations will fail.""" 156 return self.path_module.splitdrive(path)[1] 157 158 def open(self, filename, mode='r'): 159 """Forward to paramiko.SFTPClient.open for remote hosts. 160 Wrap in contextlib.closing so that clients can use 161 with-statements on it.""" 162 return contextlib.closing(self.sftp.open(self._sftpify(filename), mode)) 163 164 def drop(self, paths=[]): 165 """Close open connections and remove files. 166 paths - list of files to remove from host before closing connection 167 """ 168 map(self.rm_r, paths) 169 if self.__sftp: 170 self.__sftp.close() 171 self.__sftp = None 172 if self.__client: 173 self.__client.close() 174 self.__client = None 175 176 def file_exists(self, path): 177 """Test for the existence of a file on the remote host. If the file actually exists, 178 its stat object is returned, otherwise None. 179 path - file to check the existence of 180 """ 181 try: 182 return self.sftp.stat(self._sftpify(path)) 183 except IOError as ioerr: 184 if ioerr.errno == errno.ENOENT: 185 return None 186 _logger.debug('stat failure on '+path) 187 raise 188 189 def list_dir(self, path): 190 """List the files in a directory on the remote host. Forwards to 191 SFTPClient.listdir(), but also warns about empty results that may be caused 192 by paramiko not reporting missing execute permission on the directory 193 correctly. 194 path - directory to list 195 """ 196 content = self.sftp.listdir(self._sftpify(path)) 197 if len(content) == 0: 198 m = stat.S_IMODE(self.sftp.stat(path).st_mode) 199 for role in ['USR', 'GRP', 'OTH']: 200 mask = util.get_fmask('R', role)|util.get_fmask('X',role) 201 if (m & mask) != mask: 202 _logger.debug('Directory '+path+' does not have both read and execute permission for ' + role + '.\nIf you depend on '+role+ ' for access, the empty directory listing may not be correct') 203 204 return content 205 206 def mkdir_p(self, path): 207 """Provides mkdir -p type functionality on the remote host. That is, 208 all missing parent directories are also created. If the directory we are trying to 209 create already exists, we silently do nothing. If path or any of its parents is not 210 a directory an exception is raised. 211 path - directory to create on remote host 212 """ 213 _logger.debug('mkdir_p('+path+')') 214 path = self._sftpify(path) 215 pa = self.file_exists(path) 216 if pa != None: 217 #print str(pa)+" "+str(pa.st_mode) 218 if not util.is_dir(pa): 219 raise Exception(self.host+':'+path+' is not a directory') 220 return 221 # Need to user normpath here since dirname of a directory with a trailing slash 222 # is the directory without a slash (a dirname bug?) 223 sd = ntpath.splitdrive(path) 224 _logger.debug('sd='+str(sd)) 225 if sd[1] == '': 226 _logger.debug('path='+path+' is a drive letter. Returning...') 227 return 228 229 np = self.path_module.normpath(path) 230 parent = self.path_module.dirname(np) 231 assert parent != path 232 self.mkdir_p(parent) 233 self.sftp.mkdir(np) 234 235 def rm_r(self, path): 236 """Provides rm -r type functionality on the remote host. That is, all files and 237 directories are removed recursively. 238 path - file or directory to remove 239 """ 240 path = self._sftpify(path) 241 if util.is_dir(self.sftp.stat(path)): 242 for f in self.sftp.listdir(path): 243 self.rm_r(self.posixpath.join(path,f)) 244 self.sftp.rmdir(path) 245 else: 246 self.sftp.remove(path) 247 248 def _exec_cmdln(self, cmdln, procCtrl, stdinFile): 249 """Execute an OS command line (as a single string) on the remote host. 250 cmdln - complete command line of the OS command 251 procCtrl - procCtrl object from message which controls how the process 252 is started (blocking vs non-blocking and output reporting) 253 """ 254 255 contents = None 256 if (stdinFile != None): 257 with self.open(stdinFile) as stdin: 258 contents = stdin.read() 259 260 with contextlib.closing(self.client.get_transport().open_session()) as chan: 261 chan.set_combine_stderr(True) 262 _logger.debug('cmdln='+cmdln) 263 chan.exec_command(cmdln) 264 265 if (contents != None): 266 _logger.debug('Using supplied stdin from ' + stdinFile + ': ') 267 _logger.debug(contents[0:50] + '...') 268 chan.sendall(contents) 269 chan.shutdown_write() 270 271 if util.get_val(procCtrl, 'waitForCompletion'): 272 output = chan.makefile('rb') 273 _logger.debug('Waiting for command...') 274 exitstatus = chan.recv_exit_status() 275 if exitstatus != 0 and exitstatus != util.get_val(procCtrl, 'noRaise'): 276 raise RemoteExecException(self.host, cmdln, exitstatus, output) 277 return output.read() 278 else: 279 if not chan.exit_status_ready() and procCtrl.has_key('daemonWait'): 280 _logger.debug('Waiting {0} sec for {1}'.format(procCtrl['daemonWait'], cmdln)) 281 time.sleep(procCtrl['daemonWait']) 282 if chan.exit_status_ready(): 283 output = chan.makefile('rb') 284 raise RemoteExecException(self.host, cmdln, chan.recv_exit_status(), output) 285 286 def _exec_cmdv(self, cmdv, procCtrl, stdinFile): 287 """Execute an OS command vector on the remote host. 288 cmdv - complete command vector (argv) of the OS command 289 procCtrl - procCtrl object from message which controls how the process 290 is started (blocking vs non-blocking and output reporting) 291 """ 292 293 assert isinstance(cmdv, list) 294 return self._exec_cmdln(' '.join([quote_if_contains_space(a) for a in cmdv]), procCtrl, stdinFile) 295 296 297 def execute_command(self, cmdv, inFile=None): 298 """Execute an OS command blocking on the local host, using 299 subprocess module. Returns dict contaning output from process. 300 cmdv - complete command vector (argv) of the OS command. 301 inFile - File-like object providing stdin to the command. 302 """ 303 cmdln = ' '.join([quote_if_contains_space(a) for a in cmdv]) 304 _logger.debug('cmdln='+cmdln) 305 306 with contextlib.closing(self.client.get_transport().open_session()) as chan: 307 chan.exec_command(cmdln) 308 if inFile: 309 chan.sendall(inFile.read()) 310 chan.shutdown_write() 311 312 result = { 313 'exitstatus': chan.recv_exit_status() 314 } 315 with contextlib.closing(chan.makefile('rb')) as outFile: 316 result['out'] = outFile.read() 317 318 with contextlib.closing(chan.makefile_stderr('rb')) as errFile: 319 result['err'] = errFile.read(), 320 321 return result 322 323