1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;
19 
20 import java.io.File;
21 import java.io.IOException;
22 import java.util.Iterator;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ConcurrentMap;
25 import java.util.concurrent.atomic.AtomicLong;
26 import java.util.regex.Matcher;
27 import java.util.regex.Pattern;
28 
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.apache.hadoop.conf.Configuration;
32 import org.apache.hadoop.fs.Path;
33 import org.apache.hadoop.yarn.api.records.ApplicationId;
34 import org.apache.hadoop.yarn.api.records.LocalResource;
35 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
36 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl;
37 import org.apache.hadoop.yarn.event.Dispatcher;
38 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
39 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
40 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
41 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent;
42 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType;
43 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent;
44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
45 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
46 
47 import com.google.common.annotations.VisibleForTesting;
48 
49 
50 /**
51  * A collection of {@link LocalizedResource}s all of same
52  * {@link LocalResourceVisibility}.
53  *
54  */
55 
56 class LocalResourcesTrackerImpl implements LocalResourcesTracker {
57 
58   static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class);
59   private static final String RANDOM_DIR_REGEX = "-?\\d+";
60   private static final Pattern RANDOM_DIR_PATTERN = Pattern
61       .compile(RANDOM_DIR_REGEX);
62 
63   private final String user;
64   private final ApplicationId appId;
65   private final Dispatcher dispatcher;
66   private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc;
67   private Configuration conf;
68   /*
69    * This flag controls whether this resource tracker uses hierarchical
70    * directories or not. For PRIVATE and PUBLIC resource trackers it
71    * will be set whereas for APPLICATION resource tracker it would
72    * be false.
73    */
74   private final boolean useLocalCacheDirectoryManager;
75   private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers;
76   /*
77    * It is used to keep track of resource into hierarchical directory
78    * while it is getting downloaded. It is useful for reference counting
79    * in case resource localization fails.
80    */
81   private ConcurrentHashMap<LocalResourceRequest, Path>
82     inProgressLocalResourcesMap;
83   /*
84    * starting with 10 to accommodate 0-9 directories created as a part of
85    * LocalCacheDirectoryManager. So there will be one unique number generator
86    * per APPLICATION, USER and PUBLIC cache.
87    */
88   private AtomicLong uniqueNumberGenerator = new AtomicLong(9);
89   private NMStateStoreService stateStore;
90 
LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, Configuration conf, NMStateStoreService stateStore)91   public LocalResourcesTrackerImpl(String user, ApplicationId appId,
92       Dispatcher dispatcher, boolean useLocalCacheDirectoryManager,
93       Configuration conf, NMStateStoreService stateStore) {
94     this(user, appId, dispatcher,
95       new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(),
96       useLocalCacheDirectoryManager, conf, stateStore);
97   }
98 
LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc, boolean useLocalCacheDirectoryManager, Configuration conf, NMStateStoreService stateStore)99   LocalResourcesTrackerImpl(String user, ApplicationId appId,
100       Dispatcher dispatcher,
101       ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc,
102       boolean useLocalCacheDirectoryManager, Configuration conf,
103       NMStateStoreService stateStore) {
104     this.appId = appId;
105     this.user = user;
106     this.dispatcher = dispatcher;
107     this.localrsrc = localrsrc;
108     this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager;
109     if ( this.useLocalCacheDirectoryManager) {
110       directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>();
111       inProgressLocalResourcesMap =
112         new ConcurrentHashMap<LocalResourceRequest, Path>();
113     }
114     this.conf = conf;
115     this.stateStore = stateStore;
116   }
117 
118   /*
119    * Synchronizing this method for avoiding races due to multiple ResourceEvent's
120    * coming to LocalResourcesTracker from Public/Private localizer and
121    * Resource Localization Service.
122    */
123   @Override
handle(ResourceEvent event)124   public synchronized void handle(ResourceEvent event) {
125     LocalResourceRequest req = event.getLocalResourceRequest();
126     LocalizedResource rsrc = localrsrc.get(req);
127     switch (event.getType()) {
128     case LOCALIZED:
129       if (useLocalCacheDirectoryManager) {
130         inProgressLocalResourcesMap.remove(req);
131       }
132       break;
133     case REQUEST:
134       if (rsrc != null && (!isResourcePresent(rsrc))) {
135         LOG.info("Resource " + rsrc.getLocalPath()
136             + " is missing, localizing it again");
137         removeResource(req);
138         rsrc = null;
139       }
140       if (null == rsrc) {
141         rsrc = new LocalizedResource(req, dispatcher);
142         localrsrc.put(req, rsrc);
143       }
144       break;
145     case RELEASE:
146       if (null == rsrc) {
147         // The container sent a release event on a resource which
148         // 1) Failed
149         // 2) Removed for some reason (ex. disk is no longer accessible)
150         ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event;
151         LOG.info("Container " + relEvent.getContainer()
152             + " sent RELEASE event on a resource request " + req
153             + " not present in cache.");
154         return;
155       }
156       break;
157     case LOCALIZATION_FAILED:
158       /*
159        * If resource localization fails then Localized resource will be
160        * removed from local cache.
161        */
162       removeResource(req);
163       break;
164     case RECOVERED:
165       if (rsrc != null) {
166         LOG.warn("Ignoring attempt to recover existing resource " + rsrc);
167         return;
168       }
169       rsrc = recoverResource(req, (ResourceRecoveredEvent) event);
170       localrsrc.put(req, rsrc);
171       break;
172     }
173 
174     if (rsrc == null) {
175       LOG.warn("Received " + event.getType() + " event for request " + req
176           + " but localized resource is missing");
177       return;
178     }
179     rsrc.handle(event);
180 
181     // Remove the resource if its downloading and its reference count has
182     // become 0 after RELEASE. This maybe because a container was killed while
183     // localizing and no other container is referring to the resource.
184     // NOTE: This should NOT be done for public resources since the
185     //       download is not associated with a container-specific localizer.
186     if (event.getType() == ResourceEventType.RELEASE) {
187       if (rsrc.getState() == ResourceState.DOWNLOADING &&
188           rsrc.getRefCount() <= 0 &&
189           rsrc.getRequest().getVisibility() != LocalResourceVisibility.PUBLIC) {
190         removeResource(req);
191       }
192     }
193 
194     if (event.getType() == ResourceEventType.LOCALIZED) {
195       if (rsrc.getLocalPath() != null) {
196         try {
197           stateStore.finishResourceLocalization(user, appId,
198               buildLocalizedResourceProto(rsrc));
199         } catch (IOException ioe) {
200           LOG.error("Error storing resource state for " + rsrc, ioe);
201         }
202       } else {
203         LOG.warn("Resource " + rsrc + " localized without a location");
204       }
205     }
206   }
207 
recoverResource(LocalResourceRequest req, ResourceRecoveredEvent event)208   private LocalizedResource recoverResource(LocalResourceRequest req,
209       ResourceRecoveredEvent event) {
210     // unique number for a resource is the directory of the resource
211     Path localDir = event.getLocalPath().getParent();
212     long rsrcId = Long.parseLong(localDir.getName());
213 
214     // update ID generator to avoid conflicts with existing resources
215     while (true) {
216       long currentRsrcId = uniqueNumberGenerator.get();
217       long nextRsrcId = Math.max(currentRsrcId, rsrcId);
218       if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) {
219         break;
220       }
221     }
222 
223     incrementFileCountForLocalCacheDirectory(localDir.getParent());
224 
225     return new LocalizedResource(req, dispatcher);
226   }
227 
buildLocalizedResourceProto( LocalizedResource rsrc)228   private LocalizedResourceProto buildLocalizedResourceProto(
229       LocalizedResource rsrc) {
230     return LocalizedResourceProto.newBuilder()
231         .setResource(buildLocalResourceProto(rsrc.getRequest()))
232         .setLocalPath(rsrc.getLocalPath().toString())
233         .setSize(rsrc.getSize())
234         .build();
235   }
236 
buildLocalResourceProto(LocalResource lr)237   private LocalResourceProto buildLocalResourceProto(LocalResource lr) {
238     LocalResourcePBImpl lrpb;
239     if (!(lr instanceof LocalResourcePBImpl)) {
240       lr = LocalResource.newInstance(lr.getResource(), lr.getType(),
241           lr.getVisibility(), lr.getSize(), lr.getTimestamp(),
242           lr.getPattern());
243     }
244     lrpb = (LocalResourcePBImpl) lr;
245     return lrpb.getProto();
246   }
247 
incrementFileCountForLocalCacheDirectory(Path cacheDir)248   public void incrementFileCountForLocalCacheDirectory(Path cacheDir) {
249     if (useLocalCacheDirectoryManager) {
250       Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot(
251           cacheDir);
252       if (cacheRoot != null) {
253         LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot);
254         if (dir == null) {
255           dir = new LocalCacheDirectoryManager(conf);
256           LocalCacheDirectoryManager otherDir =
257               directoryManagers.putIfAbsent(cacheRoot, dir);
258           if (otherDir != null) {
259             dir = otherDir;
260           }
261         }
262         if (cacheDir.equals(cacheRoot)) {
263           dir.incrementFileCountForPath("");
264         } else {
265           String dirStr = cacheDir.toUri().getRawPath();
266           String rootStr = cacheRoot.toUri().getRawPath();
267           dir.incrementFileCountForPath(
268               dirStr.substring(rootStr.length() + 1));
269         }
270       }
271     }
272   }
273 
274   /*
275    * Update the file-count statistics for a local cache-directory.
276    * This will retrieve the localized path for the resource from
277    * 1) inProgressRsrcMap if the resource was under localization and it
278    * failed.
279    * 2) LocalizedResource if the resource is already localized.
280    * From this path it will identify the local directory under which the
281    * resource was localized. Then rest of the path will be used to decrement
282    * file count for the HierarchicalSubDirectory pointing to this relative
283    * path.
284    */
decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, LocalizedResource rsrc)285   private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req,
286       LocalizedResource rsrc) {
287     if ( useLocalCacheDirectoryManager) {
288       Path rsrcPath = null;
289       if (inProgressLocalResourcesMap.containsKey(req)) {
290         // This happens when localization of a resource fails.
291         rsrcPath = inProgressLocalResourcesMap.remove(req);
292       } else if (rsrc != null && rsrc.getLocalPath() != null) {
293         rsrcPath = rsrc.getLocalPath().getParent().getParent();
294       }
295       if (rsrcPath != null) {
296         Path parentPath = new Path(rsrcPath.toUri().getRawPath());
297         while (!directoryManagers.containsKey(parentPath)) {
298           parentPath = parentPath.getParent();
299           if ( parentPath == null) {
300             return;
301           }
302         }
303         if ( parentPath != null) {
304           String parentDir = parentPath.toUri().getRawPath().toString();
305           LocalCacheDirectoryManager dir = directoryManagers.get(parentPath);
306           String rsrcDir = rsrcPath.toUri().getRawPath();
307           if (rsrcDir.equals(parentDir)) {
308             dir.decrementFileCountForPath("");
309           } else {
310             dir.decrementFileCountForPath(
311               rsrcDir.substring(
312               parentDir.length() + 1));
313           }
314         }
315       }
316     }
317   }
318 
319 /**
320    * This module checks if the resource which was localized is already present
321    * or not
322    *
323    * @param rsrc
324    * @return true/false based on resource is present or not
325    */
isResourcePresent(LocalizedResource rsrc)326   public boolean isResourcePresent(LocalizedResource rsrc) {
327     boolean ret = true;
328     if (rsrc.getState() == ResourceState.LOCALIZED) {
329       File file = new File(rsrc.getLocalPath().toUri().getRawPath().
330         toString());
331       if (!file.exists()) {
332         ret = false;
333       }
334     }
335     return ret;
336   }
337 
338   @Override
remove(LocalizedResource rem, DeletionService delService)339   public boolean remove(LocalizedResource rem, DeletionService delService) {
340  // current synchronization guaranteed by crude RLS event for cleanup
341     LocalizedResource rsrc = localrsrc.get(rem.getRequest());
342     if (null == rsrc) {
343       LOG.error("Attempt to remove absent resource: " + rem.getRequest()
344           + " from " + getUser());
345       return true;
346     }
347     if (rsrc.getRefCount() > 0
348         || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) {
349       // internal error
350       LOG.error("Attempt to remove resource: " + rsrc
351           + " with non-zero refcount");
352       return false;
353     } else { // ResourceState is LOCALIZED or INIT
354       if (ResourceState.LOCALIZED.equals(rsrc.getState())) {
355         delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath()));
356       }
357       removeResource(rem.getRequest());
358       LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache");
359       return true;
360     }
361   }
362 
removeResource(LocalResourceRequest req)363   private void removeResource(LocalResourceRequest req) {
364     LocalizedResource rsrc = localrsrc.remove(req);
365     decrementFileCountForLocalCacheDirectory(req, rsrc);
366     if (rsrc != null) {
367       Path localPath = rsrc.getLocalPath();
368       if (localPath != null) {
369         try {
370           stateStore.removeLocalizedResource(user, appId, localPath);
371         } catch (IOException e) {
372           LOG.error("Unable to remove resource " + rsrc + " from state store",
373               e);
374         }
375       }
376     }
377   }
378 
379   /**
380    * Returns the path up to the random directory component.
381    */
getPathToDelete(Path localPath)382   private Path getPathToDelete(Path localPath) {
383     Path delPath = localPath.getParent();
384     String name = delPath.getName();
385     Matcher matcher = RANDOM_DIR_PATTERN.matcher(name);
386     if (matcher.matches()) {
387       return delPath;
388     } else {
389       LOG.warn("Random directory component did not match. " +
390       		"Deleting localized path only");
391       return localPath;
392     }
393   }
394 
395   @Override
getUser()396   public String getUser() {
397     return user;
398   }
399 
400   @Override
iterator()401   public Iterator<LocalizedResource> iterator() {
402     return localrsrc.values().iterator();
403   }
404 
405   /**
406    * @return {@link Path} absolute path for localization which includes local
407    *         directory path and the relative hierarchical path (if use local
408    *         cache directory manager is enabled)
409    *
410    * @param {@link LocalResourceRequest} Resource localization request to
411    *        localize the resource.
412    * @param {@link Path} local directory path
413    * @param {@link DeletionService} Deletion Service to delete existing
414    *        path for localization.
415    */
416   @Override
getPathForLocalization(LocalResourceRequest req, Path localDirPath, DeletionService delService)417   public Path getPathForLocalization(LocalResourceRequest req,
418       Path localDirPath, DeletionService delService) {
419     Path rPath = localDirPath;
420     if (useLocalCacheDirectoryManager && localDirPath != null) {
421 
422       if (!directoryManagers.containsKey(localDirPath)) {
423         directoryManagers.putIfAbsent(localDirPath,
424           new LocalCacheDirectoryManager(conf));
425       }
426       LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath);
427 
428       rPath = localDirPath;
429       String hierarchicalPath = dir.getRelativePathForLocalization();
430       // For most of the scenarios we will get root path only which
431       // is an empty string
432       if (!hierarchicalPath.isEmpty()) {
433         rPath = new Path(localDirPath, hierarchicalPath);
434       }
435       inProgressLocalResourcesMap.put(req, rPath);
436     }
437 
438     while (true) {
439       Path uniquePath = new Path(rPath,
440           Long.toString(uniqueNumberGenerator.incrementAndGet()));
441       File file = new File(uniquePath.toUri().getRawPath());
442       if (!file.exists()) {
443         rPath = uniquePath;
444         break;
445       }
446       // If the directory already exists, delete it and move to next one.
447       LOG.warn("Directory " + uniquePath + " already exists, " +
448           "try next one.");
449       if (delService != null) {
450         delService.delete(getUser(), uniquePath);
451       }
452     }
453 
454     Path localPath = new Path(rPath, req.getPath().getName());
455     LocalizedResource rsrc = localrsrc.get(req);
456     rsrc.setLocalPath(localPath);
457     LocalResource lr = LocalResource.newInstance(req.getResource(),
458         req.getType(), req.getVisibility(), req.getSize(),
459         req.getTimestamp());
460     try {
461       stateStore.startResourceLocalization(user, appId,
462           ((LocalResourcePBImpl) lr).getProto(), localPath);
463     } catch (IOException e) {
464       LOG.error("Unable to record localization start for " + rsrc, e);
465     }
466     return rPath;
467   }
468 
469   @Override
getLocalizedResource(LocalResourceRequest request)470   public LocalizedResource getLocalizedResource(LocalResourceRequest request) {
471     return localrsrc.get(request);
472   }
473 
474   @VisibleForTesting
getDirectoryManager(Path localDirPath)475   LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) {
476     LocalCacheDirectoryManager mgr = null;
477     if (useLocalCacheDirectoryManager) {
478       mgr = directoryManagers.get(localDirPath);
479     }
480     return mgr;
481   }
482 }
483