1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.nodemanager.recovery;
20 
21 import java.io.IOException;
22 import java.util.ArrayList;
23 import java.util.HashMap;
24 import java.util.HashSet;
25 import java.util.List;
26 import java.util.Map;
27 import java.util.Set;
28 
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.Path;
31 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
32 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
33 import org.apache.hadoop.yarn.api.records.ApplicationId;
34 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
35 import org.apache.hadoop.yarn.api.records.ContainerId;
36 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
37 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
38 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
39 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
40 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LogDeleterProto;
41 import org.apache.hadoop.yarn.server.api.records.MasterKey;
42 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
43 
44 public class NMMemoryStateStoreService extends NMStateStoreService {
45   private Map<ApplicationId, ContainerManagerApplicationProto> apps;
46   private Set<ApplicationId> finishedApps;
47   private Map<ContainerId, RecoveredContainerState> containerStates;
48   private Map<TrackerKey, TrackerState> trackerStates;
49   private Map<Integer, DeletionServiceDeleteTaskProto> deleteTasks;
50   private RecoveredNMTokensState nmTokenState;
51   private RecoveredContainerTokensState containerTokenState;
52   private Map<ApplicationId, LogDeleterProto> logDeleterState;
53 
NMMemoryStateStoreService()54   public NMMemoryStateStoreService() {
55     super(NMMemoryStateStoreService.class.getName());
56   }
57 
58   @Override
initStorage(Configuration conf)59   protected void initStorage(Configuration conf) {
60     apps = new HashMap<ApplicationId, ContainerManagerApplicationProto>();
61     finishedApps = new HashSet<ApplicationId>();
62     containerStates = new HashMap<ContainerId, RecoveredContainerState>();
63     nmTokenState = new RecoveredNMTokensState();
64     nmTokenState.applicationMasterKeys =
65         new HashMap<ApplicationAttemptId, MasterKey>();
66     containerTokenState = new RecoveredContainerTokensState();
67     containerTokenState.activeTokens = new HashMap<ContainerId, Long>();
68     trackerStates = new HashMap<TrackerKey, TrackerState>();
69     deleteTasks = new HashMap<Integer, DeletionServiceDeleteTaskProto>();
70     logDeleterState = new HashMap<ApplicationId, LogDeleterProto>();
71   }
72 
73   @Override
startStorage()74   protected void startStorage() {
75   }
76 
77   @Override
closeStorage()78   protected void closeStorage() {
79   }
80 
81 
82   @Override
loadApplicationsState()83   public synchronized RecoveredApplicationsState loadApplicationsState()
84       throws IOException {
85     RecoveredApplicationsState state = new RecoveredApplicationsState();
86     state.applications = new ArrayList<ContainerManagerApplicationProto>(
87         apps.values());
88     state.finishedApplications = new ArrayList<ApplicationId>(finishedApps);
89     return state;
90   }
91 
92   @Override
storeApplication(ApplicationId appId, ContainerManagerApplicationProto proto)93   public synchronized void storeApplication(ApplicationId appId,
94       ContainerManagerApplicationProto proto) throws IOException {
95     ContainerManagerApplicationProto protoCopy =
96         ContainerManagerApplicationProto.parseFrom(proto.toByteString());
97     apps.put(appId, protoCopy);
98   }
99 
100   @Override
storeFinishedApplication(ApplicationId appId)101   public synchronized void storeFinishedApplication(ApplicationId appId) {
102     finishedApps.add(appId);
103   }
104 
105   @Override
removeApplication(ApplicationId appId)106   public synchronized void removeApplication(ApplicationId appId)
107       throws IOException {
108     apps.remove(appId);
109     finishedApps.remove(appId);
110   }
111 
112   @Override
loadContainersState()113   public synchronized List<RecoveredContainerState> loadContainersState()
114       throws IOException {
115     // return a copy so caller can't modify our state
116     List<RecoveredContainerState> result =
117         new ArrayList<RecoveredContainerState>(containerStates.size());
118     for (RecoveredContainerState rcs : containerStates.values()) {
119       RecoveredContainerState rcsCopy = new RecoveredContainerState();
120       rcsCopy.status = rcs.status;
121       rcsCopy.exitCode = rcs.exitCode;
122       rcsCopy.killed = rcs.killed;
123       rcsCopy.diagnostics = rcs.diagnostics;
124       rcsCopy.startRequest = rcs.startRequest;
125       result.add(rcsCopy);
126     }
127     return new ArrayList<RecoveredContainerState>();
128   }
129 
130   @Override
storeContainer(ContainerId containerId, StartContainerRequest startRequest)131   public synchronized void storeContainer(ContainerId containerId,
132       StartContainerRequest startRequest) throws IOException {
133     RecoveredContainerState rcs = new RecoveredContainerState();
134     rcs.startRequest = startRequest;
135     containerStates.put(containerId, rcs);
136   }
137 
138   @Override
storeContainerDiagnostics(ContainerId containerId, StringBuilder diagnostics)139   public synchronized void storeContainerDiagnostics(ContainerId containerId,
140       StringBuilder diagnostics) throws IOException {
141     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
142     rcs.diagnostics = diagnostics.toString();
143   }
144 
145   @Override
storeContainerLaunched(ContainerId containerId)146   public synchronized void storeContainerLaunched(ContainerId containerId)
147       throws IOException {
148     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
149     if (rcs.exitCode != ContainerExitStatus.INVALID) {
150       throw new IOException("Container already completed");
151     }
152     rcs.status = RecoveredContainerStatus.LAUNCHED;
153   }
154 
155   @Override
storeContainerKilled(ContainerId containerId)156   public synchronized void storeContainerKilled(ContainerId containerId)
157       throws IOException {
158     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
159     rcs.killed = true;
160   }
161 
162   @Override
storeContainerCompleted(ContainerId containerId, int exitCode)163   public synchronized void storeContainerCompleted(ContainerId containerId,
164       int exitCode) throws IOException {
165     RecoveredContainerState rcs = getRecoveredContainerState(containerId);
166     rcs.status = RecoveredContainerStatus.COMPLETED;
167     rcs.exitCode = exitCode;
168   }
169 
170   @Override
removeContainer(ContainerId containerId)171   public synchronized void removeContainer(ContainerId containerId)
172       throws IOException {
173     containerStates.remove(containerId);
174   }
175 
getRecoveredContainerState( ContainerId containerId)176   private RecoveredContainerState getRecoveredContainerState(
177       ContainerId containerId) throws IOException {
178     RecoveredContainerState rcs = containerStates.get(containerId);
179     if (rcs == null) {
180       throw new IOException("No start request for " + containerId);
181     }
182     return rcs;
183   }
184 
loadTrackerState(TrackerState ts)185   private LocalResourceTrackerState loadTrackerState(TrackerState ts) {
186     LocalResourceTrackerState result = new LocalResourceTrackerState();
187     result.localizedResources.addAll(ts.localizedResources.values());
188     for (Map.Entry<Path, LocalResourceProto> entry :
189          ts.inProgressMap.entrySet()) {
190       result.inProgressResources.put(entry.getValue(), entry.getKey());
191     }
192     return result;
193   }
194 
getTrackerState(TrackerKey key)195   private TrackerState getTrackerState(TrackerKey key) {
196     TrackerState ts = trackerStates.get(key);
197     if (ts == null) {
198       ts = new TrackerState();
199       trackerStates.put(key, ts);
200     }
201     return ts;
202   }
203 
204   @Override
loadLocalizationState()205   public synchronized RecoveredLocalizationState loadLocalizationState() {
206     RecoveredLocalizationState result = new RecoveredLocalizationState();
207     for (Map.Entry<TrackerKey, TrackerState> e : trackerStates.entrySet()) {
208       TrackerKey tk = e.getKey();
209       TrackerState ts = e.getValue();
210       // check what kind of tracker state we have and recover appropriately
211       // public trackers have user == null
212       // private trackers have a valid user but appId == null
213       // app-specific trackers have a valid user and valid appId
214       if (tk.user == null) {
215         result.publicTrackerState = loadTrackerState(ts);
216       } else {
217         RecoveredUserResources rur = result.userResources.get(tk.user);
218         if (rur == null) {
219           rur = new RecoveredUserResources();
220           result.userResources.put(tk.user, rur);
221         }
222         if (tk.appId == null) {
223           rur.privateTrackerState = loadTrackerState(ts);
224         } else {
225           rur.appTrackerStates.put(tk.appId, loadTrackerState(ts));
226         }
227       }
228     }
229     return result;
230   }
231 
232   @Override
startResourceLocalization(String user, ApplicationId appId, LocalResourceProto proto, Path localPath)233   public synchronized void startResourceLocalization(String user,
234       ApplicationId appId, LocalResourceProto proto, Path localPath) {
235     TrackerState ts = getTrackerState(new TrackerKey(user, appId));
236     ts.inProgressMap.put(localPath, proto);
237   }
238 
239   @Override
finishResourceLocalization(String user, ApplicationId appId, LocalizedResourceProto proto)240   public synchronized void finishResourceLocalization(String user,
241       ApplicationId appId, LocalizedResourceProto proto) {
242     TrackerState ts = getTrackerState(new TrackerKey(user, appId));
243     Path localPath = new Path(proto.getLocalPath());
244     ts.inProgressMap.remove(localPath);
245     ts.localizedResources.put(localPath, proto);
246   }
247 
248   @Override
removeLocalizedResource(String user, ApplicationId appId, Path localPath)249   public synchronized void removeLocalizedResource(String user,
250       ApplicationId appId, Path localPath) {
251     TrackerState ts = trackerStates.get(new TrackerKey(user, appId));
252     if (ts != null) {
253       ts.inProgressMap.remove(localPath);
254       ts.localizedResources.remove(localPath);
255     }
256   }
257 
258 
259   @Override
loadDeletionServiceState()260   public synchronized RecoveredDeletionServiceState loadDeletionServiceState()
261       throws IOException {
262     RecoveredDeletionServiceState result =
263         new RecoveredDeletionServiceState();
264     result.tasks = new ArrayList<DeletionServiceDeleteTaskProto>(
265         deleteTasks.values());
266     return result;
267   }
268 
269   @Override
storeDeletionTask(int taskId, DeletionServiceDeleteTaskProto taskProto)270   public synchronized void storeDeletionTask(int taskId,
271       DeletionServiceDeleteTaskProto taskProto) throws IOException {
272     deleteTasks.put(taskId, taskProto);
273   }
274 
275   @Override
removeDeletionTask(int taskId)276   public synchronized void removeDeletionTask(int taskId) throws IOException {
277     deleteTasks.remove(taskId);
278   }
279 
280 
281   @Override
loadNMTokensState()282   public synchronized RecoveredNMTokensState loadNMTokensState()
283       throws IOException {
284     // return a copy so caller can't modify our state
285     RecoveredNMTokensState result = new RecoveredNMTokensState();
286     result.currentMasterKey = nmTokenState.currentMasterKey;
287     result.previousMasterKey = nmTokenState.previousMasterKey;
288     result.applicationMasterKeys =
289         new HashMap<ApplicationAttemptId, MasterKey>(
290             nmTokenState.applicationMasterKeys);
291     return result;
292   }
293 
294   @Override
storeNMTokenCurrentMasterKey(MasterKey key)295   public synchronized void storeNMTokenCurrentMasterKey(MasterKey key)
296       throws IOException {
297     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
298     nmTokenState.currentMasterKey = new MasterKeyPBImpl(keypb.getProto());
299   }
300 
301   @Override
storeNMTokenPreviousMasterKey(MasterKey key)302   public synchronized void storeNMTokenPreviousMasterKey(MasterKey key)
303       throws IOException {
304     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
305     nmTokenState.previousMasterKey = new MasterKeyPBImpl(keypb.getProto());
306   }
307 
308   @Override
storeNMTokenApplicationMasterKey( ApplicationAttemptId attempt, MasterKey key)309   public synchronized void storeNMTokenApplicationMasterKey(
310       ApplicationAttemptId attempt, MasterKey key) throws IOException {
311     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
312     nmTokenState.applicationMasterKeys.put(attempt,
313         new MasterKeyPBImpl(keypb.getProto()));
314   }
315 
316   @Override
removeNMTokenApplicationMasterKey( ApplicationAttemptId attempt)317   public synchronized void removeNMTokenApplicationMasterKey(
318       ApplicationAttemptId attempt) throws IOException {
319     nmTokenState.applicationMasterKeys.remove(attempt);
320   }
321 
322 
323   @Override
loadContainerTokensState()324   public synchronized RecoveredContainerTokensState loadContainerTokensState()
325       throws IOException {
326     // return a copy so caller can't modify our state
327     RecoveredContainerTokensState result =
328         new RecoveredContainerTokensState();
329     result.currentMasterKey = containerTokenState.currentMasterKey;
330     result.previousMasterKey = containerTokenState.previousMasterKey;
331     result.activeTokens =
332         new HashMap<ContainerId, Long>(containerTokenState.activeTokens);
333     return result;
334   }
335 
336   @Override
storeContainerTokenCurrentMasterKey(MasterKey key)337   public synchronized void storeContainerTokenCurrentMasterKey(MasterKey key)
338       throws IOException {
339     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
340     containerTokenState.currentMasterKey =
341         new MasterKeyPBImpl(keypb.getProto());
342   }
343 
344   @Override
storeContainerTokenPreviousMasterKey(MasterKey key)345   public synchronized void storeContainerTokenPreviousMasterKey(MasterKey key)
346       throws IOException {
347     MasterKeyPBImpl keypb = (MasterKeyPBImpl) key;
348     containerTokenState.previousMasterKey =
349         new MasterKeyPBImpl(keypb.getProto());
350   }
351 
352   @Override
storeContainerToken(ContainerId containerId, Long expirationTime)353   public synchronized void storeContainerToken(ContainerId containerId,
354       Long expirationTime) throws IOException {
355     containerTokenState.activeTokens.put(containerId, expirationTime);
356   }
357 
358   @Override
removeContainerToken(ContainerId containerId)359   public synchronized void removeContainerToken(ContainerId containerId)
360       throws IOException {
361     containerTokenState.activeTokens.remove(containerId);
362   }
363 
364 
365   @Override
loadLogDeleterState()366   public synchronized RecoveredLogDeleterState loadLogDeleterState()
367       throws IOException {
368     RecoveredLogDeleterState state = new RecoveredLogDeleterState();
369     state.logDeleterMap = new HashMap<ApplicationId,LogDeleterProto>(
370         logDeleterState);
371     return state;
372   }
373 
374   @Override
storeLogDeleter(ApplicationId appId, LogDeleterProto proto)375   public synchronized void storeLogDeleter(ApplicationId appId,
376       LogDeleterProto proto)
377       throws IOException {
378     logDeleterState.put(appId, proto);
379   }
380 
381   @Override
removeLogDeleter(ApplicationId appId)382   public synchronized void removeLogDeleter(ApplicationId appId)
383       throws IOException {
384     logDeleterState.remove(appId);
385   }
386 
387 
388   private static class TrackerState {
389     Map<Path, LocalResourceProto> inProgressMap =
390         new HashMap<Path, LocalResourceProto>();
391     Map<Path, LocalizedResourceProto> localizedResources =
392         new HashMap<Path, LocalizedResourceProto>();
393   }
394 
395   private static class TrackerKey {
396     String user;
397     ApplicationId appId;
398 
TrackerKey(String user, ApplicationId appId)399     public TrackerKey(String user, ApplicationId appId) {
400       this.user = user;
401       this.appId = appId;
402     }
403 
404     @Override
hashCode()405     public int hashCode() {
406       final int prime = 31;
407       int result = 1;
408       result = prime * result + ((appId == null) ? 0 : appId.hashCode());
409       result = prime * result + ((user == null) ? 0 : user.hashCode());
410       return result;
411     }
412 
413     @Override
equals(Object obj)414     public boolean equals(Object obj) {
415       if (this == obj)
416         return true;
417       if (obj == null)
418         return false;
419       if (!(obj instanceof TrackerKey))
420         return false;
421       TrackerKey other = (TrackerKey) obj;
422       if (appId == null) {
423         if (other.appId != null)
424           return false;
425       } else if (!appId.equals(other.appId))
426         return false;
427       if (user == null) {
428         if (other.user != null)
429           return false;
430       } else if (!user.equals(other.user))
431         return false;
432       return true;
433     }
434   }
435 }
436