1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler; 19 20 import java.io.IOException; 21 import java.util.ArrayList; 22 import java.util.List; 23 import java.util.Map; 24 import java.util.concurrent.ConcurrentHashMap; 25 import java.util.concurrent.ScheduledThreadPoolExecutor; 26 import java.util.concurrent.ThreadFactory; 27 import java.util.concurrent.TimeUnit; 28 import java.util.concurrent.RejectedExecutionException; 29 30 import org.apache.commons.logging.Log; 31 import org.apache.commons.logging.LogFactory; 32 import org.apache.hadoop.conf.Configuration; 33 import org.apache.hadoop.fs.FileContext; 34 import org.apache.hadoop.fs.Path; 35 import org.apache.hadoop.fs.UnsupportedFileSystemException; 36 import org.apache.hadoop.service.AbstractService; 37 import org.apache.hadoop.yarn.api.records.ApplicationId; 38 import org.apache.hadoop.yarn.conf.YarnConfiguration; 39 import org.apache.hadoop.yarn.event.Dispatcher; 40 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; 41 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; 42 import org.apache.hadoop.yarn.server.nodemanager.DeletionService; 43 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService; 44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEvent; 45 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationEventType; 46 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppFinishedEvent; 47 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent; 48 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent; 49 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; 50 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService.RecoveredLogDeleterState; 51 52 import com.google.common.util.concurrent.ThreadFactoryBuilder; 53 54 /** 55 * Log Handler which schedules deletion of log files based on the configured log 56 * retention time. 57 */ 58 public class NonAggregatingLogHandler extends AbstractService implements 59 LogHandler { 60 61 private static final Log LOG = LogFactory 62 .getLog(NonAggregatingLogHandler.class); 63 private final Dispatcher dispatcher; 64 private final DeletionService delService; 65 private final Map<ApplicationId, String> appOwners; 66 67 private final LocalDirsHandlerService dirsHandler; 68 private final NMStateStoreService stateStore; 69 private long deleteDelaySeconds; 70 private ScheduledThreadPoolExecutor sched; 71 NonAggregatingLogHandler(Dispatcher dispatcher, DeletionService delService, LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore)72 public NonAggregatingLogHandler(Dispatcher dispatcher, 73 DeletionService delService, LocalDirsHandlerService dirsHandler, 74 NMStateStoreService stateStore) { 75 super(NonAggregatingLogHandler.class.getName()); 76 this.dispatcher = dispatcher; 77 this.delService = delService; 78 this.dirsHandler = dirsHandler; 79 this.stateStore = stateStore; 80 this.appOwners = new ConcurrentHashMap<ApplicationId, String>(); 81 } 82 83 @Override serviceInit(Configuration conf)84 protected void serviceInit(Configuration conf) throws Exception { 85 // Default 3 hours. 86 this.deleteDelaySeconds = 87 conf.getLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 88 YarnConfiguration.DEFAULT_NM_LOG_RETAIN_SECONDS); 89 sched = createScheduledThreadPoolExecutor(conf); 90 super.serviceInit(conf); 91 recover(); 92 } 93 94 @Override serviceStop()95 protected void serviceStop() throws Exception { 96 if (sched != null) { 97 sched.shutdown(); 98 boolean isShutdown = false; 99 try { 100 isShutdown = sched.awaitTermination(10, TimeUnit.SECONDS); 101 } catch (InterruptedException e) { 102 sched.shutdownNow(); 103 isShutdown = true; 104 } 105 if (!isShutdown) { 106 sched.shutdownNow(); 107 } 108 } 109 super.serviceStop(); 110 } 111 getLocalFileContext(Configuration conf)112 FileContext getLocalFileContext(Configuration conf) { 113 try { 114 return FileContext.getLocalFSFileContext(conf); 115 } catch (IOException e) { 116 throw new YarnRuntimeException("Failed to access local fs"); 117 } 118 } 119 recover()120 private void recover() throws IOException { 121 if (stateStore.canRecover()) { 122 RecoveredLogDeleterState state = stateStore.loadLogDeleterState(); 123 long now = System.currentTimeMillis(); 124 for (Map.Entry<ApplicationId, LogDeleterProto> entry : 125 state.getLogDeleterMap().entrySet()) { 126 ApplicationId appId = entry.getKey(); 127 LogDeleterProto proto = entry.getValue(); 128 long deleteDelayMsec = proto.getDeletionTime() - now; 129 if (LOG.isDebugEnabled()) { 130 LOG.debug("Scheduling deletion of " + appId + " logs in " 131 + deleteDelayMsec + " msec"); 132 } 133 LogDeleterRunnable logDeleter = 134 new LogDeleterRunnable(proto.getUser(), appId); 135 try { 136 sched.schedule(logDeleter, deleteDelayMsec, TimeUnit.MILLISECONDS); 137 } catch (RejectedExecutionException e) { 138 // Handling this event in local thread before starting threads 139 // or after calling sched.shutdownNow(). 140 logDeleter.run(); 141 } 142 } 143 } 144 } 145 146 @SuppressWarnings("unchecked") 147 @Override handle(LogHandlerEvent event)148 public void handle(LogHandlerEvent event) { 149 switch (event.getType()) { 150 case APPLICATION_STARTED: 151 LogHandlerAppStartedEvent appStartedEvent = 152 (LogHandlerAppStartedEvent) event; 153 this.appOwners.put(appStartedEvent.getApplicationId(), 154 appStartedEvent.getUser()); 155 this.dispatcher.getEventHandler().handle( 156 new ApplicationEvent(appStartedEvent.getApplicationId(), 157 ApplicationEventType.APPLICATION_LOG_HANDLING_INITED)); 158 break; 159 case CONTAINER_FINISHED: 160 // Ignore 161 break; 162 case APPLICATION_FINISHED: 163 LogHandlerAppFinishedEvent appFinishedEvent = 164 (LogHandlerAppFinishedEvent) event; 165 ApplicationId appId = appFinishedEvent.getApplicationId(); 166 // Schedule - so that logs are available on the UI till they're deleted. 167 LOG.info("Scheduling Log Deletion for application: " 168 + appId + ", with delay of " 169 + this.deleteDelaySeconds + " seconds"); 170 String user = appOwners.remove(appId); 171 if (user == null) { 172 LOG.error("Unable to locate user for " + appId); 173 break; 174 } 175 LogDeleterRunnable logDeleter = new LogDeleterRunnable(user, appId); 176 long deletionTimestamp = System.currentTimeMillis() 177 + this.deleteDelaySeconds * 1000; 178 LogDeleterProto deleterProto = LogDeleterProto.newBuilder() 179 .setUser(user) 180 .setDeletionTime(deletionTimestamp) 181 .build(); 182 try { 183 stateStore.storeLogDeleter(appId, deleterProto); 184 } catch (IOException e) { 185 LOG.error("Unable to record log deleter state", e); 186 } 187 try { 188 sched.schedule(logDeleter, this.deleteDelaySeconds, 189 TimeUnit.SECONDS); 190 } catch (RejectedExecutionException e) { 191 // Handling this event in local thread before starting threads 192 // or after calling sched.shutdownNow(). 193 logDeleter.run(); 194 } 195 break; 196 default: 197 ; // Ignore 198 } 199 } 200 createScheduledThreadPoolExecutor( Configuration conf)201 ScheduledThreadPoolExecutor createScheduledThreadPoolExecutor( 202 Configuration conf) { 203 ThreadFactory tf = 204 new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build(); 205 sched = 206 new ScheduledThreadPoolExecutor(conf.getInt( 207 YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT, 208 YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf); 209 return sched; 210 } 211 212 class LogDeleterRunnable implements Runnable { 213 private String user; 214 private ApplicationId applicationId; 215 LogDeleterRunnable(String user, ApplicationId applicationId)216 public LogDeleterRunnable(String user, ApplicationId applicationId) { 217 this.user = user; 218 this.applicationId = applicationId; 219 } 220 221 @Override 222 @SuppressWarnings("unchecked") run()223 public void run() { 224 List<Path> localAppLogDirs = new ArrayList<Path>(); 225 FileContext lfs = getLocalFileContext(getConfig()); 226 for (String rootLogDir : dirsHandler.getLogDirsForCleanup()) { 227 Path logDir = new Path(rootLogDir, applicationId.toString()); 228 try { 229 lfs.getFileStatus(logDir); 230 localAppLogDirs.add(logDir); 231 } catch (UnsupportedFileSystemException ue) { 232 LOG.warn("Unsupported file system used for log dir " + logDir, ue); 233 continue; 234 } catch (IOException ie) { 235 continue; 236 } 237 } 238 239 // Inform the application before the actual delete itself, so that links 240 // to logs will no longer be there on NM web-UI. 241 NonAggregatingLogHandler.this.dispatcher.getEventHandler().handle( 242 new ApplicationEvent(this.applicationId, 243 ApplicationEventType.APPLICATION_LOG_HANDLING_FINISHED)); 244 if (localAppLogDirs.size() > 0) { 245 NonAggregatingLogHandler.this.delService.delete(user, null, 246 (Path[]) localAppLogDirs.toArray(new Path[localAppLogDirs.size()])); 247 } 248 try { 249 NonAggregatingLogHandler.this.stateStore.removeLogDeleter( 250 this.applicationId); 251 } catch (IOException e) { 252 LOG.error("Error removing log deletion state", e); 253 } 254 } 255 256 @Override toString()257 public String toString() { 258 return "LogDeleter for AppId " + this.applicationId.toString() 259 + ", owned by " + user; 260 } 261 } 262 }