1 /* 2 * Licensed to the Apache Software Foundation (ASF) under one or more 3 * contributor license agreements. See the NOTICE file distributed with 4 * this work for additional information regarding copyright ownership. 5 * The ASF licenses this file to You under the Apache License, Version 2.0 6 * (the "License"); you may not use this file except in compliance with 7 * the License. You may obtain a copy of the License at 8 * 9 * http://www.apache.org/licenses/LICENSE-2.0 10 * 11 * Unless required by applicable law or agreed to in writing, software 12 * distributed under the License is distributed on an "AS IS" BASIS, 13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14 * See the License for the specific language governing permissions and 15 * limitations under the License. 16 */ 17 18 package org.apache.spark.launcher; 19 20 import java.io.IOException; 21 import java.lang.reflect.Method; 22 import java.util.ArrayList; 23 import java.util.List; 24 import java.util.logging.Level; 25 import java.util.logging.Logger; 26 27 /** 28 * Handle implementation for monitoring apps started as a child process. 29 */ 30 class ChildProcAppHandle implements SparkAppHandle { 31 32 private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName()); 33 34 private final String secret; 35 private final LauncherServer server; 36 37 private Process childProc; 38 private boolean disposed; 39 private LauncherConnection connection; 40 private List<Listener> listeners; 41 private State state; 42 private String appId; 43 private OutputRedirector redirector; 44 ChildProcAppHandle(String secret, LauncherServer server)45 ChildProcAppHandle(String secret, LauncherServer server) { 46 this.secret = secret; 47 this.server = server; 48 this.state = State.UNKNOWN; 49 } 50 51 @Override addListener(Listener l)52 public synchronized void addListener(Listener l) { 53 if (listeners == null) { 54 listeners = new ArrayList<>(); 55 } 56 listeners.add(l); 57 } 58 59 @Override getState()60 public State getState() { 61 return state; 62 } 63 64 @Override getAppId()65 public String getAppId() { 66 return appId; 67 } 68 69 @Override stop()70 public void stop() { 71 CommandBuilderUtils.checkState(connection != null, "Application is still not connected."); 72 try { 73 connection.send(new LauncherProtocol.Stop()); 74 } catch (IOException ioe) { 75 throw new RuntimeException(ioe); 76 } 77 } 78 79 @Override disconnect()80 public synchronized void disconnect() { 81 if (!disposed) { 82 disposed = true; 83 if (connection != null) { 84 try { 85 connection.close(); 86 } catch (IOException ioe) { 87 // no-op. 88 } 89 } 90 server.unregister(this); 91 if (redirector != null) { 92 redirector.stop(); 93 } 94 } 95 } 96 97 @Override kill()98 public synchronized void kill() { 99 if (!disposed) { 100 disconnect(); 101 } 102 if (childProc != null) { 103 try { 104 childProc.exitValue(); 105 } catch (IllegalThreadStateException e) { 106 // Child is still alive. Try to use Java 8's "destroyForcibly()" if available, 107 // fall back to the old API if it's not there. 108 try { 109 Method destroy = childProc.getClass().getMethod("destroyForcibly"); 110 destroy.invoke(childProc); 111 } catch (Exception inner) { 112 childProc.destroy(); 113 } 114 } finally { 115 childProc = null; 116 } 117 } 118 } 119 getSecret()120 String getSecret() { 121 return secret; 122 } 123 setChildProc(Process childProc, String loggerName)124 void setChildProc(Process childProc, String loggerName) { 125 this.childProc = childProc; 126 this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName, 127 SparkLauncher.REDIRECTOR_FACTORY); 128 } 129 setConnection(LauncherConnection connection)130 void setConnection(LauncherConnection connection) { 131 this.connection = connection; 132 } 133 getServer()134 LauncherServer getServer() { 135 return server; 136 } 137 getConnection()138 LauncherConnection getConnection() { 139 return connection; 140 } 141 setState(State s)142 void setState(State s) { 143 if (!state.isFinal()) { 144 state = s; 145 fireEvent(false); 146 } else { 147 LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.", 148 new Object[] { state, s }); 149 } 150 } 151 setAppId(String appId)152 void setAppId(String appId) { 153 this.appId = appId; 154 fireEvent(true); 155 } 156 fireEvent(boolean isInfoChanged)157 private synchronized void fireEvent(boolean isInfoChanged) { 158 if (listeners != null) { 159 for (Listener l : listeners) { 160 if (isInfoChanged) { 161 l.infoChanged(this); 162 } else { 163 l.stateChanged(this); 164 } 165 } 166 } 167 } 168 169 } 170