1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager.recovery; 20 21 import java.io.IOException; 22 import java.util.ArrayList; 23 import java.util.HashMap; 24 import java.util.HashSet; 25 import java.util.List; 26 import java.util.Map; 27 import java.util.Set; 28 29 import org.apache.hadoop.conf.Configuration; 30 import org.apache.hadoop.fs.Path; 31 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest; 32 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 33 import org.apache.hadoop.yarn.api.records.ApplicationId; 34 import org.apache.hadoop.yarn.api.records.ContainerExitStatus; 35 import org.apache.hadoop.yarn.api.records.ContainerId; 36 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; 37 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto; 38 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto; 39 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; 40 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto; 41 import org.apache.hadoop.yarn.server.api.records.MasterKey; 42 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl; 43 44 public class NMMemoryStateStoreService extends NMStateStoreService { 45 private Map<ApplicationId, ContainerManagerApplicationProto> apps; 46 private Set<ApplicationId> finishedApps; 47 private Map<ContainerId, RecoveredContainerState> containerStates; 48 private Map<TrackerKey, TrackerState> trackerStates; 49 private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks; 50 private RecoveredNMTokensState nmTokenState; 51 private RecoveredContainerTokensState containerTokenState; 52 private Map<ApplicationId, LogDeleterProto> logDeleterState; 53 NMMemoryStateStoreService()54 public NMMemoryStateStoreService() { 55 super(NMMemoryStateStoreService.class.getName()); 56 } 57 58 @Override initStorage(Configuration conf)59 protected void initStorage(Configuration conf) { 60 apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>(); 61 finishedApps = new HashSet<ApplicationId>(); 62 containerStates = new HashMap<ContainerId, RecoveredContainerState>(); 63 nmTokenState = new RecoveredNMTokensState(); 64 nmTokenState.applicationMasterKeys = 65 new HashMap<ApplicationAttemptId, MasterKey>(); 66 containerTokenState = new RecoveredContainerTokensState(); 67 containerTokenState.activeTokens = new HashMap<ContainerId, Long>(); 68 trackerStates = new HashMap<TrackerKey, TrackerState>(); 69 deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>(); 70 logDeleterState = new HashMap<ApplicationId, LogDeleterProto>(); 71 } 72 73 @Override startStorage()74 protected void startStorage() { 75 } 76 77 @Override closeStorage()78 protected void closeStorage() { 79 } 80 81 82 @Override loadApplicationsState()83 public synchronized RecoveredApplicationsState loadApplicationsState() 84 throws IOException { 85 RecoveredApplicationsState state = new RecoveredApplicationsState(); 86 state.applications = new ArrayList<ContainerManagerApplicationProto>( 87 apps.values()); 88 state.finishedApplications = new ArrayList<ApplicationId>(finishedApps); 89 return state; 90 } 91 92 @Override storeApplication(ApplicationId appId, ContainerManagerApplicationProto proto)93 public synchronized void storeApplication(ApplicationId appId, 94 ContainerManagerApplicationProto proto) throws IOException { 95 ContainerManagerApplicationProto protoCopy = 96 ContainerManagerApplicationProto.parseFrom(proto.toByteString()); 97 apps.put(appId, protoCopy); 98 } 99 100 @Override storeFinishedApplication(ApplicationId appId)101 public synchronized void storeFinishedApplication(ApplicationId appId) { 102 finishedApps.add(appId); 103 } 104 105 @Override removeApplication(ApplicationId appId)106 public synchronized void removeApplication(ApplicationId appId) 107 throws IOException { 108 apps.remove(appId); 109 finishedApps.remove(appId); 110 } 111 112 @Override loadContainersState()113 public synchronized List<RecoveredContainerState> loadContainersState() 114 throws IOException { 115 // return a copy so caller can't modify our state 116 List<RecoveredContainerState> result = 117 new ArrayList<RecoveredContainerState>(containerStates.size()); 118 for (RecoveredContainerState rcs : containerStates.values()) { 119 RecoveredContainerState rcsCopy = new RecoveredContainerState(); 120 rcsCopy.status = rcs.status; 121 rcsCopy.exitCode = rcs.exitCode; 122 rcsCopy.killed = rcs.killed; 123 rcsCopy.diagnostics = rcs.diagnostics; 124 rcsCopy.startRequest = rcs.startRequest; 125 result.add(rcsCopy); 126 } 127 return new ArrayList<RecoveredContainerState>(); 128 } 129 130 @Override storeContainer(ContainerId containerId, StartContainerRequest startRequest)131 public synchronized void storeContainer(ContainerId containerId, 132 StartContainerRequest startRequest) throws IOException { 133 RecoveredContainerState rcs = new RecoveredContainerState(); 134 rcs.startRequest = startRequest; 135 containerStates.put(containerId, rcs); 136 } 137 138 @Override storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics)139 public synchronized void storeContainerDiagnostics(ContainerId containerId, 140 StringBuilder diagnostics) throws IOException { 141 RecoveredContainerState rcs = getRecoveredContainerState(containerId); 142 rcs.diagnostics = diagnostics.toString(); 143 } 144 145 @Override storeContainerLaunched(ContainerId containerId)146 public synchronized void storeContainerLaunched(ContainerId containerId) 147 throws IOException { 148 RecoveredContainerState rcs = getRecoveredContainerState(containerId); 149 if (rcs.exitCode != ContainerExitStatus.INVALID) { 150 throw new IOException("Container already completed"); 151 } 152 rcs.status = RecoveredContainerStatus.LAUNCHED; 153 } 154 155 @Override storeContainerKilled(ContainerId containerId)156 public synchronized void storeContainerKilled(ContainerId containerId) 157 throws IOException { 158 RecoveredContainerState rcs = getRecoveredContainerState(containerId); 159 rcs.killed = true; 160 } 161 162 @Override storeContainerCompleted(ContainerId containerId, int exitCode)163 public synchronized void storeContainerCompleted(ContainerId containerId, 164 int exitCode) throws IOException { 165 RecoveredContainerState rcs = getRecoveredContainerState(containerId); 166 rcs.status = RecoveredContainerStatus.COMPLETED; 167 rcs.exitCode = exitCode; 168 } 169 170 @Override removeContainer(ContainerId containerId)171 public synchronized void removeContainer(ContainerId containerId) 172 throws IOException { 173 containerStates.remove(containerId); 174 } 175 getRecoveredContainerState( ContainerId containerId)176 private RecoveredContainerState getRecoveredContainerState( 177 ContainerId containerId) throws IOException { 178 RecoveredContainerState rcs = containerStates.get(containerId); 179 if (rcs == null) { 180 throw new IOException("No start request for " + containerId); 181 } 182 return rcs; 183 } 184 loadTrackerState(TrackerState ts)185 private LocalResourceTrackerState loadTrackerState(TrackerState ts) { 186 LocalResourceTrackerState result = new LocalResourceTrackerState(); 187 result.localizedResources.addAll(ts.localizedResources.values()); 188 for (Map.Entry<Path, LocalResourceProto> entry : 189 ts.inProgressMap.entrySet()) { 190 result.inProgressResources.put(entry.getValue(), entry.getKey()); 191 } 192 return result; 193 } 194 getTrackerState(TrackerKey key)195 private TrackerState getTrackerState(TrackerKey key) { 196 TrackerState ts = trackerStates.get(key); 197 if (ts == null) { 198 ts = new TrackerState(); 199 trackerStates.put(key, ts); 200 } 201 return ts; 202 } 203 204 @Override loadLocalizationState()205 public synchronized RecoveredLocalizationState loadLocalizationState() { 206 RecoveredLocalizationState result = new RecoveredLocalizationState(); 207 for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) { 208 TrackerKey tk = e.getKey(); 209 TrackerState ts = e.getValue(); 210 // check what kind of tracker state we have and recover appropriately 211 // public trackers have user == null 212 // private trackers have a valid user but appId == null 213 // app-specific trackers have a valid user and valid appId 214 if (tk.user == null) { 215 result.publicTrackerState = loadTrackerState(ts); 216 } else { 217 RecoveredUserResources rur = result.userResources.get(tk.user); 218 if (rur == null) { 219 rur = new RecoveredUserResources(); 220 result.userResources.put(tk.user, rur); 221 } 222 if (tk.appId == null) { 223 rur.privateTrackerState = loadTrackerState(ts); 224 } else { 225 rur.appTrackerStates.put(tk.appId, loadTrackerState(ts)); 226 } 227 } 228 } 229 return result; 230 } 231 232 @Override startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath)233 public synchronized void startResourceLocalization(String user, 234 ApplicationId appId, LocalResourceProto proto, Path localPath) { 235 TrackerState ts = getTrackerState(new TrackerKey(user, appId)); 236 ts.inProgressMap.put(localPath, proto); 237 } 238 239 @Override finishResourceLocalization(String user, ApplicationId appId, LocalizedResourceProto proto)240 public synchronized void finishResourceLocalization(String user, 241 ApplicationId appId, LocalizedResourceProto proto) { 242 TrackerState ts = getTrackerState(new TrackerKey(user, appId)); 243 Path localPath = new Path(proto.getLocalPath()); 244 ts.inProgressMap.remove(localPath); 245 ts.localizedResources.put(localPath, proto); 246 } 247 248 @Override removeLocalizedResource(String user, ApplicationId appId, Path localPath)249 public synchronized void removeLocalizedResource(String user, 250 ApplicationId appId, Path localPath) { 251 TrackerState ts = trackerStates.get(new TrackerKey(user, appId)); 252 if (ts != null) { 253 ts.inProgressMap.remove(localPath); 254 ts.localizedResources.remove(localPath); 255 } 256 } 257 258 259 @Override loadDeletionServiceState()260 public synchronized RecoveredDeletionServiceState loadDeletionServiceState() 261 throws IOException { 262 RecoveredDeletionServiceState result = 263 new RecoveredDeletionServiceState(); 264 result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>( 265 deleteTasks.values()); 266 return result; 267 } 268 269 @Override storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto)270 public synchronized void storeDeletionTask(int taskId, 271 DeletionServiceDeleteTaskProto taskProto) throws IOException { 272 deleteTasks.put(taskId, taskProto); 273 } 274 275 @Override removeDeletionTask(int taskId)276 public synchronized void removeDeletionTask(int taskId) throws IOException { 277 deleteTasks.remove(taskId); 278 } 279 280 281 @Override loadNMTokensState()282 public synchronized RecoveredNMTokensState loadNMTokensState() 283 throws IOException { 284 // return a copy so caller can't modify our state 285 RecoveredNMTokensState result = new RecoveredNMTokensState(); 286 result.currentMasterKey = nmTokenState.currentMasterKey; 287 result.previousMasterKey = nmTokenState.previousMasterKey; 288 result.applicationMasterKeys = 289 new HashMap<ApplicationAttemptId, MasterKey>( 290 nmTokenState.applicationMasterKeys); 291 return result; 292 } 293 294 @Override storeNMTokenCurrentMasterKey(MasterKey key)295 public synchronized void storeNMTokenCurrentMasterKey(MasterKey key) 296 throws IOException { 297 MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; 298 nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto()); 299 } 300 301 @Override storeNMTokenPreviousMasterKey(MasterKey key)302 public synchronized void storeNMTokenPreviousMasterKey(MasterKey key) 303 throws IOException { 304 MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; 305 nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto()); 306 } 307 308 @Override storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key)309 public synchronized void storeNMTokenApplicationMasterKey( 310 ApplicationAttemptId attempt, MasterKey key) throws IOException { 311 MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; 312 nmTokenState.applicationMasterKeys.put(attempt, 313 new MasterKeyPBImpl(keypb.getProto())); 314 } 315 316 @Override removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt)317 public synchronized void removeNMTokenApplicationMasterKey( 318 ApplicationAttemptId attempt) throws IOException { 319 nmTokenState.applicationMasterKeys.remove(attempt); 320 } 321 322 323 @Override loadContainerTokensState()324 public synchronized RecoveredContainerTokensState loadContainerTokensState() 325 throws IOException { 326 // return a copy so caller can't modify our state 327 RecoveredContainerTokensState result = 328 new RecoveredContainerTokensState(); 329 result.currentMasterKey = containerTokenState.currentMasterKey; 330 result.previousMasterKey = containerTokenState.previousMasterKey; 331 result.activeTokens = 332 new HashMap<ContainerId, Long>(containerTokenState.activeTokens); 333 return result; 334 } 335 336 @Override storeContainerTokenCurrentMasterKey(MasterKey key)337 public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key) 338 throws IOException { 339 MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; 340 containerTokenState.currentMasterKey = 341 new MasterKeyPBImpl(keypb.getProto()); 342 } 343 344 @Override storeContainerTokenPreviousMasterKey(MasterKey key)345 public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key) 346 throws IOException { 347 MasterKeyPBImpl keypb = (MasterKeyPBImpl) key; 348 containerTokenState.previousMasterKey = 349 new MasterKeyPBImpl(keypb.getProto()); 350 } 351 352 @Override storeContainerToken(ContainerId containerId, Long expirationTime)353 public synchronized void storeContainerToken(ContainerId containerId, 354 Long expirationTime) throws IOException { 355 containerTokenState.activeTokens.put(containerId, expirationTime); 356 } 357 358 @Override removeContainerToken(ContainerId containerId)359 public synchronized void removeContainerToken(ContainerId containerId) 360 throws IOException { 361 containerTokenState.activeTokens.remove(containerId); 362 } 363 364 365 @Override loadLogDeleterState()366 public synchronized RecoveredLogDeleterState loadLogDeleterState() 367 throws IOException { 368 RecoveredLogDeleterState state = new RecoveredLogDeleterState(); 369 state.logDeleterMap = new HashMap<ApplicationId,LogDeleterProto>( 370 logDeleterState); 371 return state; 372 } 373 374 @Override storeLogDeleter(ApplicationId appId, LogDeleterProto proto)375 public synchronized void storeLogDeleter(ApplicationId appId, 376 LogDeleterProto proto) 377 throws IOException { 378 logDeleterState.put(appId, proto); 379 } 380 381 @Override removeLogDeleter(ApplicationId appId)382 public synchronized void removeLogDeleter(ApplicationId appId) 383 throws IOException { 384 logDeleterState.remove(appId); 385 } 386 387 388 private static class TrackerState { 389 Map<Path, LocalResourceProto> inProgressMap = 390 new HashMap<Path, LocalResourceProto>(); 391 Map<Path, LocalizedResourceProto> localizedResources = 392 new HashMap<Path, LocalizedResourceProto>(); 393 } 394 395 private static class TrackerKey { 396 String user; 397 ApplicationId appId; 398 TrackerKey(String user, ApplicationId appId)399 public TrackerKey(String user, ApplicationId appId) { 400 this.user = user; 401 this.appId = appId; 402 } 403 404 @Override hashCode()405 public int hashCode() { 406 final int prime = 31; 407 int result = 1; 408 result = prime * result + ((appId == null) ? 0 : appId.hashCode()); 409 result = prime * result + ((user == null) ? 0 : user.hashCode()); 410 return result; 411 } 412 413 @Override equals(Object obj)414 public boolean equals(Object obj) { 415 if (this == obj) 416 return true; 417 if (obj == null) 418 return false; 419 if (!(obj instanceof TrackerKey)) 420 return false; 421 TrackerKey other = (TrackerKey) obj; 422 if (appId == null) { 423 if (other.appId != null) 424 return false; 425 } else if (!appId.equals(other.appId)) 426 return false; 427 if (user == null) { 428 if (other.user != null) 429 return false; 430 } else if (!user.equals(other.user)) 431 return false; 432 return true; 433 } 434 } 435 } 436