1# coding=utf-8
2"""
3Copyright 2013 LinkedIn Corp. All rights reserved.
4
5Licensed under the Apache License, Version 2.0 (the "License");
6you may not use this file except in compliance with the License.
7You may obtain a copy of the License at
8
9    http://www.apache.org/licenses/LICENSE-2.0
10
11Unless required by applicable law or agreed to in writing, software
12distributed under the License is distributed on an "AS IS" BASIS,
13WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14See the License for the specific language governing permissions and
15limitations under the License.
16"""
17
18import logging
19import shlex
20import subprocess
21import time
22from threading import Timer
23from naarad.run_steps.run_step import Run_Step
24import naarad.naarad_constants as CONSTANTS
25
26logger = logging.getLogger('naarad.run_steps.local_cmd')
27
28
29class Local_Cmd(Run_Step):
30  """
31  Class for a local command as run step.
32  This type will be most likely used when running workload from the same machine running naarad
33  """
34
35  def __init__(self, run_type, run_cmd, call_type, run_order, run_rank, should_wait=True, kill_after_seconds=None):
36    Run_Step.__init__(self, run_type, run_cmd, call_type, run_order, run_rank, should_wait, kill_after_seconds)
37    self.process = None
38
39  def run(self):
40    """
41    Run the command, infer time period to be used in metric analysis phase.
42    :return: None
43    """
44    cmd_args = shlex.split(self.run_cmd)
45    logger.info('Local command RUN-STEP starting with rank %d', self.run_rank)
46    logger.info('Running subprocess command with following args: ' + str(cmd_args))
47
48    # TODO: Add try catch blocks. Kill process on CTRL-C
49    # Infer time period for analysis. Assume same timezone between client and servers.
50    self.ts_start = time.strftime("%Y-%m-%d %H:%M:%S")
51    try:
52      self.process = subprocess.Popen(cmd_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1)
53      if self.kill_after_seconds:
54        self.timer = Timer(self.kill_after_seconds, self.kill)
55        self.timer.start()
56      # Using 2nd method here to stream output:
57      # http://stackoverflow.com/questions/2715847/python-read-streaming-input-from-subprocess-communicate
58      for line in iter(self.process.stdout.readline, b''):
59        logger.info(line.strip())
60      self.process.communicate()
61    except KeyboardInterrupt:
62      logger.warning('Handling keyboard interrupt (Ctrl-C)')
63      self.kill()
64    if self.timer:
65      self.timer.cancel()
66    self.ts_end = time.strftime("%Y-%m-%d %H:%M:%S")
67    logger.info('subprocess finished')
68    logger.info('run_step started at ' + self.ts_start + ' and ended at ' + self.ts_end)
69
70  def kill(self):
71    """
72    If run_step needs to be killed, this method will be called
73    :return: None
74    """
75    try:
76      logger.info('Trying to terminating run_step...')
77      self.process.terminate()
78      time_waited_seconds = 0
79      while self.process.poll() is None and time_waited_seconds < CONSTANTS.SECONDS_TO_KILL_AFTER_SIGTERM:
80        time.sleep(0.5)
81        time_waited_seconds += 0.5
82      if self.process.poll() is None:
83        self.process.kill()
84        logger.warning('Waited %d seconds for run_step to terminate. Killing now....', CONSTANTS.SECONDS_TO_KILL_AFTER_SIGTERM)
85    except OSError, e:
86      logger.error('Error while trying to kill the subprocess: %s', e)
87