1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler;
19 
20 import java.io.IOException;
21 import java.util.ArrayList;
22 import java.util.List;
23 import java.util.Map;
24 import java.util.concurrent.ConcurrentHashMap;
25 import java.util.concurrent.ScheduledThreadPoolExecutor;
26 import java.util.concurrent.ThreadFactory;
27 import java.util.concurrent.TimeUnit;
28 import java.util.concurrent.RejectedExecutionException;
29 
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32 import org.apache.hadoop.conf.Configuration;
33 import org.apache.hadoop.fs.FileContext;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.fs.UnsupportedFileSystemException;
36 import org.apache.hadoop.service.AbstractService;
37 import org.apache.hadoop.yarn.api.records.ApplicationId;
38 import org.apache.hadoop.yarn.conf.YarnConfiguration;
39 import org.apache.hadoop.yarn.event.Dispatcher;
40 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
41 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
42 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
43 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent;
45 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType;
46 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent;
47 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
48 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
49 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
50 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState;
51 
52 import com.google.common.util.concurrent.ThreadFactoryBuilder;
53 
54 /**
55  * Log Handler which schedules deletion of log files based on the configured log
56  * retention time.
57  */
58 public class NonAggregatingLogHandler extends AbstractService implements
59     LogHandler {
60 
61   private static final Log LOG = LogFactory
62       .getLog(NonAggregatingLogHandler.class);
63   private final Dispatcher dispatcher;
64   private final DeletionService delService;
65   private final Map<ApplicationId, String> appOwners;
66 
67   private final LocalDirsHandlerService dirsHandler;
68   private final NMStateStoreService stateStore;
69   private long deleteDelaySeconds;
70   private ScheduledThreadPoolExecutor sched;
71 
NonAggregatingLogHandler(Dispatcher dispatcher, DeletionService delService, LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore)72   public NonAggregatingLogHandler(Dispatcher dispatcher,
73       DeletionService delService, LocalDirsHandlerService dirsHandler,
74       NMStateStoreService stateStore) {
75     super(NonAggregatingLogHandler.class.getName());
76     this.dispatcher = dispatcher;
77     this.delService = delService;
78     this.dirsHandler = dirsHandler;
79     this.stateStore = stateStore;
80     this.appOwners = new ConcurrentHashMap<ApplicationId, String>();
81   }
82 
83   @Override
serviceInit(Configuration conf)84   protected void serviceInit(Configuration conf) throws Exception {
85     // Default 3 hours.
86     this.deleteDelaySeconds =
87         conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS,
88                 YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS);
89     sched = createScheduledThreadPoolExecutor(conf);
90     super.serviceInit(conf);
91     recover();
92   }
93 
94   @Override
serviceStop()95   protected void serviceStop() throws Exception {
96     if (sched != null) {
97       sched.shutdown();
98       boolean isShutdown = false;
99       try {
100         isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS);
101       } catch (InterruptedException e) {
102         sched.shutdownNow();
103         isShutdown = true;
104       }
105       if (!isShutdown) {
106         sched.shutdownNow();
107       }
108     }
109     super.serviceStop();
110   }
111 
getLocalFileContext(Configuration conf)112   FileContext getLocalFileContext(Configuration conf) {
113     try {
114       return FileContext.getLocalFSFileContext(conf);
115     } catch (IOException e) {
116       throw new YarnRuntimeException("Failed to access local fs");
117     }
118   }
119 
recover()120   private void recover() throws IOException {
121     if (stateStore.canRecover()) {
122       RecoveredLogDeleterState state = stateStore.loadLogDeleterState();
123       long now = System.currentTimeMillis();
124       for (Map.Entry<ApplicationId, LogDeleterProto> entry :
125         state.getLogDeleterMap().entrySet()) {
126         ApplicationId appId = entry.getKey();
127         LogDeleterProto proto = entry.getValue();
128         long deleteDelayMsec = proto.getDeletionTime() - now;
129         if (LOG.isDebugEnabled()) {
130           LOG.debug("Scheduling deletion of " + appId + " logs in "
131               + deleteDelayMsec + " msec");
132         }
133         LogDeleterRunnable logDeleter =
134             new LogDeleterRunnable(proto.getUser(), appId);
135         try {
136           sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS);
137         } catch (RejectedExecutionException e) {
138           // Handling this event in local thread before starting threads
139           // or after calling sched.shutdownNow().
140           logDeleter.run();
141         }
142       }
143     }
144   }
145 
146   @SuppressWarnings("unchecked")
147   @Override
handle(LogHandlerEvent event)148   public void handle(LogHandlerEvent event) {
149     switch (event.getType()) {
150       case APPLICATION_STARTED:
151         LogHandlerAppStartedEvent appStartedEvent =
152             (LogHandlerAppStartedEvent) event;
153         this.appOwners.put(appStartedEvent.getApplicationId(),
154             appStartedEvent.getUser());
155         this.dispatcher.getEventHandler().handle(
156             new ApplicationEvent(appStartedEvent.getApplicationId(),
157                 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED));
158         break;
159       case CONTAINER_FINISHED:
160         // Ignore
161         break;
162       case APPLICATION_FINISHED:
163         LogHandlerAppFinishedEvent appFinishedEvent =
164             (LogHandlerAppFinishedEvent) event;
165         ApplicationId appId = appFinishedEvent.getApplicationId();
166         // Schedule - so that logs are available on the UI till they're deleted.
167         LOG.info("Scheduling Log Deletion for application: "
168             + appId + ", with delay of "
169             + this.deleteDelaySeconds + " seconds");
170         String user = appOwners.remove(appId);
171         if (user == null) {
172           LOG.error("Unable to locate user for " + appId);
173           break;
174         }
175         LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId);
176         long deletionTimestamp = System.currentTimeMillis()
177             + this.deleteDelaySeconds * 1000;
178         LogDeleterProto deleterProto = LogDeleterProto.newBuilder()
179             .setUser(user)
180             .setDeletionTime(deletionTimestamp)
181             .build();
182         try {
183           stateStore.storeLogDeleter(appId, deleterProto);
184         } catch (IOException e) {
185           LOG.error("Unable to record log deleter state", e);
186         }
187         try {
188           sched.schedule(logDeleter, this.deleteDelaySeconds,
189               TimeUnit.SECONDS);
190         } catch (RejectedExecutionException e) {
191           // Handling this event in local thread before starting threads
192           // or after calling sched.shutdownNow().
193           logDeleter.run();
194         }
195         break;
196       default:
197         ; // Ignore
198     }
199   }
200 
createScheduledThreadPoolExecutor( Configuration conf)201   ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor(
202       Configuration conf) {
203     ThreadFactory tf =
204         new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
205     sched =
206         new ScheduledThreadPoolExecutor(conf.getInt(
207             YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
208             YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
209     return sched;
210   }
211 
212   class LogDeleterRunnable implements Runnable {
213     private String user;
214     private ApplicationId applicationId;
215 
LogDeleterRunnable(String user, ApplicationId applicationId)216     public LogDeleterRunnable(String user, ApplicationId applicationId) {
217       this.user = user;
218       this.applicationId = applicationId;
219     }
220 
221     @Override
222     @SuppressWarnings("unchecked")
run()223     public void run() {
224       List<Path> localAppLogDirs = new ArrayList<Path>();
225       FileContext lfs = getLocalFileContext(getConfig());
226       for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) {
227         Path logDir = new Path(rootLogDir, applicationId.toString());
228         try {
229           lfs.getFileStatus(logDir);
230           localAppLogDirs.add(logDir);
231         } catch (UnsupportedFileSystemException ue) {
232           LOG.warn("Unsupported file system used for log dir " + logDir, ue);
233           continue;
234         } catch (IOException ie) {
235           continue;
236         }
237       }
238 
239       // Inform the application before the actual delete itself, so that links
240       // to logs will no longer be there on NM web-UI.
241       NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle(
242         new ApplicationEvent(this.applicationId,
243           ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED));
244       if (localAppLogDirs.size() > 0) {
245         NonAggregatingLogHandler.this.delService.delete(user, null,
246           (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()]));
247       }
248       try {
249         NonAggregatingLogHandler.this.stateStore.removeLogDeleter(
250             this.applicationId);
251       } catch (IOException e) {
252         LOG.error("Error removing log deletion state", e);
253       }
254     }
255 
256     @Override
toString()257     public String toString() {
258       return "LogDeleter for AppId " + this.applicationId.toString()
259           + ", owned by " + user;
260     }
261   }
262 }