1# coding=utf-8 2""" 3Copyright 2013 LinkedIn Corp. All rights reserved. 4 5Licensed under the Apache License, Version 2.0 (the "License"); 6you may not use this file except in compliance with the License. 7You may obtain a copy of the License at 8 9 http://www.apache.org/licenses/LICENSE-2.0 10 11Unless required by applicable law or agreed to in writing, software 12distributed under the License is distributed on an "AS IS" BASIS, 13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14See the License for the specific language governing permissions and 15limitations under the License. 16""" 17 18import logging 19import shlex 20import subprocess 21import time 22from threading import Timer 23from naarad.run_steps.run_step import Run_Step 24import naarad.naarad_constants as CONSTANTS 25 26logger = logging.getLogger('naarad.run_steps.local_cmd') 27 28 29class Local_Cmd(Run_Step): 30 """ 31 Class for a local command as run step. 32 This type will be most likely used when running workload from the same machine running naarad 33 """ 34 35 def __init__(self, run_type, run_cmd, call_type, run_order, run_rank, should_wait=True, kill_after_seconds=None): 36 Run_Step.__init__(self, run_type, run_cmd, call_type, run_order, run_rank, should_wait, kill_after_seconds) 37 self.process = None 38 39 def run(self): 40 """ 41 Run the command, infer time period to be used in metric analysis phase. 42 :return: None 43 """ 44 cmd_args = shlex.split(self.run_cmd) 45 logger.info('Local command RUN-STEP starting with rank %d', self.run_rank) 46 logger.info('Running subprocess command with following args: ' + str(cmd_args)) 47 48 # TODO: Add try catch blocks. Kill process on CTRL-C 49 # Infer time period for analysis. Assume same timezone between client and servers. 50 self.ts_start = time.strftime("%Y-%m-%d %H:%M:%S") 51 try: 52 self.process = subprocess.Popen(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1) 53 if self.kill_after_seconds: 54 self.timer = Timer(self.kill_after_seconds, self.kill) 55 self.timer.start() 56 # Using 2nd method here to stream output: 57 # http://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate 58 for line in iter(self.process.stdout.readline, b''): 59 logger.info(line.strip()) 60 self.process.communicate() 61 except KeyboardInterrupt: 62 logger.warning('Handling keyboard interrupt (Ctrl-C)') 63 self.kill() 64 if self.timer: 65 self.timer.cancel() 66 self.ts_end = time.strftime("%Y-%m-%d %H:%M:%S") 67 logger.info('subprocess finished') 68 logger.info('run_step started at ' + self.ts_start + ' and ended at ' + self.ts_end) 69 70 def kill(self): 71 """ 72 If run_step needs to be killed, this method will be called 73 :return: None 74 """ 75 try: 76 logger.info('Trying to terminating run_step...') 77 self.process.terminate() 78 time_waited_seconds = 0 79 while self.process.poll() is None and time_waited_seconds < CONSTANTS.SECONDS_TO_KILL_AFTER_SIGTERM: 80 time.sleep(0.5) 81 time_waited_seconds += 0.5 82 if self.process.poll() is None: 83 self.process.kill() 84 logger.warning('Waited %d seconds for run_step to terminate. Killing now....', CONSTANTS.SECONDS_TO_KILL_AFTER_SIGTERM) 85 except OSError, e: 86 logger.error('Error while trying to kill the subprocess: %s', e) 87