1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one or more
3  * contributor license agreements.  See the NOTICE file distributed with
4  * this work for additional information regarding copyright ownership.
5  * The ASF licenses this file to You under the Apache License, Version 2.0
6  * (the "License"); you may not use this file except in compliance with
7  * the License.  You may obtain a copy of the License at
8  *
9  *    http://www.apache.org/licenses/LICENSE-2.0
10  *
11  * Unless required by applicable law or agreed to in writing, software
12  * distributed under the License is distributed on an "AS IS" BASIS,
13  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14  * See the License for the specific language governing permissions and
15  * limitations under the License.
16  */
17 
18 package org.apache.spark.launcher;
19 
20 import java.io.IOException;
21 import java.lang.reflect.Method;
22 import java.util.ArrayList;
23 import java.util.List;
24 import java.util.logging.Level;
25 import java.util.logging.Logger;
26 
27 /**
28  * Handle implementation for monitoring apps started as a child process.
29  */
30 class ChildProcAppHandle implements SparkAppHandle {
31 
32   private static final Logger LOG = Logger.getLogger(ChildProcAppHandle.class.getName());
33 
34   private final String secret;
35   private final LauncherServer server;
36 
37   private Process childProc;
38   private boolean disposed;
39   private LauncherConnection connection;
40   private List<Listener> listeners;
41   private State state;
42   private String appId;
43   private OutputRedirector redirector;
44 
ChildProcAppHandle(String secret, LauncherServer server)45   ChildProcAppHandle(String secret, LauncherServer server) {
46     this.secret = secret;
47     this.server = server;
48     this.state = State.UNKNOWN;
49   }
50 
51   @Override
addListener(Listener l)52   public synchronized void addListener(Listener l) {
53     if (listeners == null) {
54       listeners = new ArrayList<>();
55     }
56     listeners.add(l);
57   }
58 
59   @Override
getState()60   public State getState() {
61     return state;
62   }
63 
64   @Override
getAppId()65   public String getAppId() {
66     return appId;
67   }
68 
69   @Override
stop()70   public void stop() {
71     CommandBuilderUtils.checkState(connection != null, "Application is still not connected.");
72     try {
73       connection.send(new LauncherProtocol.Stop());
74     } catch (IOException ioe) {
75       throw new RuntimeException(ioe);
76     }
77   }
78 
79   @Override
disconnect()80   public synchronized void disconnect() {
81     if (!disposed) {
82       disposed = true;
83       if (connection != null) {
84         try {
85           connection.close();
86         } catch (IOException ioe) {
87           // no-op.
88         }
89       }
90       server.unregister(this);
91       if (redirector != null) {
92         redirector.stop();
93       }
94     }
95   }
96 
97   @Override
kill()98   public synchronized void kill() {
99     if (!disposed) {
100       disconnect();
101     }
102     if (childProc != null) {
103       try {
104         childProc.exitValue();
105       } catch (IllegalThreadStateException e) {
106         // Child is still alive. Try to use Java 8's "destroyForcibly()" if available,
107         // fall back to the old API if it's not there.
108         try {
109           Method destroy = childProc.getClass().getMethod("destroyForcibly");
110           destroy.invoke(childProc);
111         } catch (Exception inner) {
112           childProc.destroy();
113         }
114       } finally {
115         childProc = null;
116       }
117     }
118   }
119 
getSecret()120   String getSecret() {
121     return secret;
122   }
123 
setChildProc(Process childProc, String loggerName)124   void setChildProc(Process childProc, String loggerName) {
125     this.childProc = childProc;
126     this.redirector = new OutputRedirector(childProc.getInputStream(), loggerName,
127       SparkLauncher.REDIRECTOR_FACTORY);
128   }
129 
setConnection(LauncherConnection connection)130   void setConnection(LauncherConnection connection) {
131     this.connection = connection;
132   }
133 
getServer()134   LauncherServer getServer() {
135     return server;
136   }
137 
getConnection()138   LauncherConnection getConnection() {
139     return connection;
140   }
141 
setState(State s)142   void setState(State s) {
143     if (!state.isFinal()) {
144       state = s;
145       fireEvent(false);
146     } else {
147       LOG.log(Level.WARNING, "Backend requested transition from final state {0} to {1}.",
148         new Object[] { state, s });
149     }
150   }
151 
setAppId(String appId)152   void setAppId(String appId) {
153     this.appId = appId;
154     fireEvent(true);
155   }
156 
fireEvent(boolean isInfoChanged)157   private synchronized void fireEvent(boolean isInfoChanged) {
158     if (listeners != null) {
159       for (Listener l : listeners) {
160         if (isInfoChanged) {
161           l.infoChanged(this);
162         } else {
163           l.stateChanged(this);
164         }
165       }
166     }
167   }
168 
169 }
170