1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer; 19 20 import java.io.File; 21 import java.io.IOException; 22 import java.util.Iterator; 23 import java.util.concurrent.ConcurrentHashMap; 24 import java.util.concurrent.ConcurrentMap; 25 import java.util.concurrent.atomic.AtomicLong; 26 import java.util.regex.Matcher; 27 import java.util.regex.Pattern; 28 29 import org.apache.commons.logging.Log; 30 import org.apache.commons.logging.LogFactory; 31 import org.apache.hadoop.conf.Configuration; 32 import org.apache.hadoop.fs.Path; 33 import org.apache.hadoop.yarn.api.records.ApplicationId; 34 import org.apache.hadoop.yarn.api.records.LocalResource; 35 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 36 import org.apache.hadoop.yarn.api.records.impl.pb.LocalResourcePBImpl; 37 import org.apache.hadoop.yarn.event.Dispatcher; 38 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto; 39 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto; 40 import org.apache.hadoop.yarn.server.nodemanager.DeletionService; 41 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEvent; 42 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceEventType; 43 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRecoveredEvent; 44 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent; 45 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService; 46 47 import com.google.common.annotations.VisibleForTesting; 48 49 50 /** 51 * A collection of {@link LocalizedResource}s all of same 52 * {@link LocalResourceVisibility}. 53 * 54 */ 55 56 class LocalResourcesTrackerImpl implements LocalResourcesTracker { 57 58 static final Log LOG = LogFactory.getLog(LocalResourcesTrackerImpl.class); 59 private static final String RANDOM_DIR_REGEX = "-?\\d+"; 60 private static final Pattern RANDOM_DIR_PATTERN = Pattern 61 .compile(RANDOM_DIR_REGEX); 62 63 private final String user; 64 private final ApplicationId appId; 65 private final Dispatcher dispatcher; 66 private final ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc; 67 private Configuration conf; 68 /* 69 * This flag controls whether this resource tracker uses hierarchical 70 * directories or not. For PRIVATE and PUBLIC resource trackers it 71 * will be set whereas for APPLICATION resource tracker it would 72 * be false. 73 */ 74 private final boolean useLocalCacheDirectoryManager; 75 private ConcurrentHashMap<Path, LocalCacheDirectoryManager> directoryManagers; 76 /* 77 * It is used to keep track of resource into hierarchical directory 78 * while it is getting downloaded. It is useful for reference counting 79 * in case resource localization fails. 80 */ 81 private ConcurrentHashMap<LocalResourceRequest, Path> 82 inProgressLocalResourcesMap; 83 /* 84 * starting with 10 to accommodate 0-9 directories created as a part of 85 * LocalCacheDirectoryManager. So there will be one unique number generator 86 * per APPLICATION, USER and PUBLIC cache. 87 */ 88 private AtomicLong uniqueNumberGenerator = new AtomicLong(9); 89 private NMStateStoreService stateStore; 90 LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, Configuration conf, NMStateStoreService stateStore)91 public LocalResourcesTrackerImpl(String user, ApplicationId appId, 92 Dispatcher dispatcher, boolean useLocalCacheDirectoryManager, 93 Configuration conf, NMStateStoreService stateStore) { 94 this(user, appId, dispatcher, 95 new ConcurrentHashMap<LocalResourceRequest, LocalizedResource>(), 96 useLocalCacheDirectoryManager, conf, stateStore); 97 } 98 LocalResourcesTrackerImpl(String user, ApplicationId appId, Dispatcher dispatcher, ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc, boolean useLocalCacheDirectoryManager, Configuration conf, NMStateStoreService stateStore)99 LocalResourcesTrackerImpl(String user, ApplicationId appId, 100 Dispatcher dispatcher, 101 ConcurrentMap<LocalResourceRequest,LocalizedResource> localrsrc, 102 boolean useLocalCacheDirectoryManager, Configuration conf, 103 NMStateStoreService stateStore) { 104 this.appId = appId; 105 this.user = user; 106 this.dispatcher = dispatcher; 107 this.localrsrc = localrsrc; 108 this.useLocalCacheDirectoryManager = useLocalCacheDirectoryManager; 109 if ( this.useLocalCacheDirectoryManager) { 110 directoryManagers = new ConcurrentHashMap<Path, LocalCacheDirectoryManager>(); 111 inProgressLocalResourcesMap = 112 new ConcurrentHashMap<LocalResourceRequest, Path>(); 113 } 114 this.conf = conf; 115 this.stateStore = stateStore; 116 } 117 118 /* 119 * Synchronizing this method for avoiding races due to multiple ResourceEvent's 120 * coming to LocalResourcesTracker from Public/Private localizer and 121 * Resource Localization Service. 122 */ 123 @Override handle(ResourceEvent event)124 public synchronized void handle(ResourceEvent event) { 125 LocalResourceRequest req = event.getLocalResourceRequest(); 126 LocalizedResource rsrc = localrsrc.get(req); 127 switch (event.getType()) { 128 case LOCALIZED: 129 if (useLocalCacheDirectoryManager) { 130 inProgressLocalResourcesMap.remove(req); 131 } 132 break; 133 case REQUEST: 134 if (rsrc != null && (!isResourcePresent(rsrc))) { 135 LOG.info("Resource " + rsrc.getLocalPath() 136 + " is missing, localizing it again"); 137 removeResource(req); 138 rsrc = null; 139 } 140 if (null == rsrc) { 141 rsrc = new LocalizedResource(req, dispatcher); 142 localrsrc.put(req, rsrc); 143 } 144 break; 145 case RELEASE: 146 if (null == rsrc) { 147 // The container sent a release event on a resource which 148 // 1) Failed 149 // 2) Removed for some reason (ex. disk is no longer accessible) 150 ResourceReleaseEvent relEvent = (ResourceReleaseEvent) event; 151 LOG.info("Container " + relEvent.getContainer() 152 + " sent RELEASE event on a resource request " + req 153 + " not present in cache."); 154 return; 155 } 156 break; 157 case LOCALIZATION_FAILED: 158 /* 159 * If resource localization fails then Localized resource will be 160 * removed from local cache. 161 */ 162 removeResource(req); 163 break; 164 case RECOVERED: 165 if (rsrc != null) { 166 LOG.warn("Ignoring attempt to recover existing resource " + rsrc); 167 return; 168 } 169 rsrc = recoverResource(req, (ResourceRecoveredEvent) event); 170 localrsrc.put(req, rsrc); 171 break; 172 } 173 174 if (rsrc == null) { 175 LOG.warn("Received " + event.getType() + " event for request " + req 176 + " but localized resource is missing"); 177 return; 178 } 179 rsrc.handle(event); 180 181 // Remove the resource if its downloading and its reference count has 182 // become 0 after RELEASE. This maybe because a container was killed while 183 // localizing and no other container is referring to the resource. 184 // NOTE: This should NOT be done for public resources since the 185 // download is not associated with a container-specific localizer. 186 if (event.getType() == ResourceEventType.RELEASE) { 187 if (rsrc.getState() == ResourceState.DOWNLOADING && 188 rsrc.getRefCount() <= 0 && 189 rsrc.getRequest().getVisibility() != LocalResourceVisibility.PUBLIC) { 190 removeResource(req); 191 } 192 } 193 194 if (event.getType() == ResourceEventType.LOCALIZED) { 195 if (rsrc.getLocalPath() != null) { 196 try { 197 stateStore.finishResourceLocalization(user, appId, 198 buildLocalizedResourceProto(rsrc)); 199 } catch (IOException ioe) { 200 LOG.error("Error storing resource state for " + rsrc, ioe); 201 } 202 } else { 203 LOG.warn("Resource " + rsrc + " localized without a location"); 204 } 205 } 206 } 207 recoverResource(LocalResourceRequest req, ResourceRecoveredEvent event)208 private LocalizedResource recoverResource(LocalResourceRequest req, 209 ResourceRecoveredEvent event) { 210 // unique number for a resource is the directory of the resource 211 Path localDir = event.getLocalPath().getParent(); 212 long rsrcId = Long.parseLong(localDir.getName()); 213 214 // update ID generator to avoid conflicts with existing resources 215 while (true) { 216 long currentRsrcId = uniqueNumberGenerator.get(); 217 long nextRsrcId = Math.max(currentRsrcId, rsrcId); 218 if (uniqueNumberGenerator.compareAndSet(currentRsrcId, nextRsrcId)) { 219 break; 220 } 221 } 222 223 incrementFileCountForLocalCacheDirectory(localDir.getParent()); 224 225 return new LocalizedResource(req, dispatcher); 226 } 227 buildLocalizedResourceProto( LocalizedResource rsrc)228 private LocalizedResourceProto buildLocalizedResourceProto( 229 LocalizedResource rsrc) { 230 return LocalizedResourceProto.newBuilder() 231 .setResource(buildLocalResourceProto(rsrc.getRequest())) 232 .setLocalPath(rsrc.getLocalPath().toString()) 233 .setSize(rsrc.getSize()) 234 .build(); 235 } 236 buildLocalResourceProto(LocalResource lr)237 private LocalResourceProto buildLocalResourceProto(LocalResource lr) { 238 LocalResourcePBImpl lrpb; 239 if (!(lr instanceof LocalResourcePBImpl)) { 240 lr = LocalResource.newInstance(lr.getResource(), lr.getType(), 241 lr.getVisibility(), lr.getSize(), lr.getTimestamp(), 242 lr.getPattern()); 243 } 244 lrpb = (LocalResourcePBImpl) lr; 245 return lrpb.getProto(); 246 } 247 incrementFileCountForLocalCacheDirectory(Path cacheDir)248 public void incrementFileCountForLocalCacheDirectory(Path cacheDir) { 249 if (useLocalCacheDirectoryManager) { 250 Path cacheRoot = LocalCacheDirectoryManager.getCacheDirectoryRoot( 251 cacheDir); 252 if (cacheRoot != null) { 253 LocalCacheDirectoryManager dir = directoryManagers.get(cacheRoot); 254 if (dir == null) { 255 dir = new LocalCacheDirectoryManager(conf); 256 LocalCacheDirectoryManager otherDir = 257 directoryManagers.putIfAbsent(cacheRoot, dir); 258 if (otherDir != null) { 259 dir = otherDir; 260 } 261 } 262 if (cacheDir.equals(cacheRoot)) { 263 dir.incrementFileCountForPath(""); 264 } else { 265 String dirStr = cacheDir.toUri().getRawPath(); 266 String rootStr = cacheRoot.toUri().getRawPath(); 267 dir.incrementFileCountForPath( 268 dirStr.substring(rootStr.length() + 1)); 269 } 270 } 271 } 272 } 273 274 /* 275 * Update the file-count statistics for a local cache-directory. 276 * This will retrieve the localized path for the resource from 277 * 1) inProgressRsrcMap if the resource was under localization and it 278 * failed. 279 * 2) LocalizedResource if the resource is already localized. 280 * From this path it will identify the local directory under which the 281 * resource was localized. Then rest of the path will be used to decrement 282 * file count for the HierarchicalSubDirectory pointing to this relative 283 * path. 284 */ decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, LocalizedResource rsrc)285 private void decrementFileCountForLocalCacheDirectory(LocalResourceRequest req, 286 LocalizedResource rsrc) { 287 if ( useLocalCacheDirectoryManager) { 288 Path rsrcPath = null; 289 if (inProgressLocalResourcesMap.containsKey(req)) { 290 // This happens when localization of a resource fails. 291 rsrcPath = inProgressLocalResourcesMap.remove(req); 292 } else if (rsrc != null && rsrc.getLocalPath() != null) { 293 rsrcPath = rsrc.getLocalPath().getParent().getParent(); 294 } 295 if (rsrcPath != null) { 296 Path parentPath = new Path(rsrcPath.toUri().getRawPath()); 297 while (!directoryManagers.containsKey(parentPath)) { 298 parentPath = parentPath.getParent(); 299 if ( parentPath == null) { 300 return; 301 } 302 } 303 if ( parentPath != null) { 304 String parentDir = parentPath.toUri().getRawPath().toString(); 305 LocalCacheDirectoryManager dir = directoryManagers.get(parentPath); 306 String rsrcDir = rsrcPath.toUri().getRawPath(); 307 if (rsrcDir.equals(parentDir)) { 308 dir.decrementFileCountForPath(""); 309 } else { 310 dir.decrementFileCountForPath( 311 rsrcDir.substring( 312 parentDir.length() + 1)); 313 } 314 } 315 } 316 } 317 } 318 319 /** 320 * This module checks if the resource which was localized is already present 321 * or not 322 * 323 * @param rsrc 324 * @return true/false based on resource is present or not 325 */ isResourcePresent(LocalizedResource rsrc)326 public boolean isResourcePresent(LocalizedResource rsrc) { 327 boolean ret = true; 328 if (rsrc.getState() == ResourceState.LOCALIZED) { 329 File file = new File(rsrc.getLocalPath().toUri().getRawPath(). 330 toString()); 331 if (!file.exists()) { 332 ret = false; 333 } 334 } 335 return ret; 336 } 337 338 @Override remove(LocalizedResource rem, DeletionService delService)339 public boolean remove(LocalizedResource rem, DeletionService delService) { 340 // current synchronization guaranteed by crude RLS event for cleanup 341 LocalizedResource rsrc = localrsrc.get(rem.getRequest()); 342 if (null == rsrc) { 343 LOG.error("Attempt to remove absent resource: " + rem.getRequest() 344 + " from " + getUser()); 345 return true; 346 } 347 if (rsrc.getRefCount() > 0 348 || ResourceState.DOWNLOADING.equals(rsrc.getState()) || rsrc != rem) { 349 // internal error 350 LOG.error("Attempt to remove resource: " + rsrc 351 + " with non-zero refcount"); 352 return false; 353 } else { // ResourceState is LOCALIZED or INIT 354 if (ResourceState.LOCALIZED.equals(rsrc.getState())) { 355 delService.delete(getUser(), getPathToDelete(rsrc.getLocalPath())); 356 } 357 removeResource(rem.getRequest()); 358 LOG.info("Removed " + rsrc.getLocalPath() + " from localized cache"); 359 return true; 360 } 361 } 362 removeResource(LocalResourceRequest req)363 private void removeResource(LocalResourceRequest req) { 364 LocalizedResource rsrc = localrsrc.remove(req); 365 decrementFileCountForLocalCacheDirectory(req, rsrc); 366 if (rsrc != null) { 367 Path localPath = rsrc.getLocalPath(); 368 if (localPath != null) { 369 try { 370 stateStore.removeLocalizedResource(user, appId, localPath); 371 } catch (IOException e) { 372 LOG.error("Unable to remove resource " + rsrc + " from state store", 373 e); 374 } 375 } 376 } 377 } 378 379 /** 380 * Returns the path up to the random directory component. 381 */ getPathToDelete(Path localPath)382 private Path getPathToDelete(Path localPath) { 383 Path delPath = localPath.getParent(); 384 String name = delPath.getName(); 385 Matcher matcher = RANDOM_DIR_PATTERN.matcher(name); 386 if (matcher.matches()) { 387 return delPath; 388 } else { 389 LOG.warn("Random directory component did not match. " + 390 "Deleting localized path only"); 391 return localPath; 392 } 393 } 394 395 @Override getUser()396 public String getUser() { 397 return user; 398 } 399 400 @Override iterator()401 public Iterator<LocalizedResource> iterator() { 402 return localrsrc.values().iterator(); 403 } 404 405 /** 406 * @return {@link Path} absolute path for localization which includes local 407 * directory path and the relative hierarchical path (if use local 408 * cache directory manager is enabled) 409 * 410 * @param {@link LocalResourceRequest} Resource localization request to 411 * localize the resource. 412 * @param {@link Path} local directory path 413 * @param {@link DeletionService} Deletion Service to delete existing 414 * path for localization. 415 */ 416 @Override getPathForLocalization(LocalResourceRequest req, Path localDirPath, DeletionService delService)417 public Path getPathForLocalization(LocalResourceRequest req, 418 Path localDirPath, DeletionService delService) { 419 Path rPath = localDirPath; 420 if (useLocalCacheDirectoryManager && localDirPath != null) { 421 422 if (!directoryManagers.containsKey(localDirPath)) { 423 directoryManagers.putIfAbsent(localDirPath, 424 new LocalCacheDirectoryManager(conf)); 425 } 426 LocalCacheDirectoryManager dir = directoryManagers.get(localDirPath); 427 428 rPath = localDirPath; 429 String hierarchicalPath = dir.getRelativePathForLocalization(); 430 // For most of the scenarios we will get root path only which 431 // is an empty string 432 if (!hierarchicalPath.isEmpty()) { 433 rPath = new Path(localDirPath, hierarchicalPath); 434 } 435 inProgressLocalResourcesMap.put(req, rPath); 436 } 437 438 while (true) { 439 Path uniquePath = new Path(rPath, 440 Long.toString(uniqueNumberGenerator.incrementAndGet())); 441 File file = new File(uniquePath.toUri().getRawPath()); 442 if (!file.exists()) { 443 rPath = uniquePath; 444 break; 445 } 446 // If the directory already exists, delete it and move to next one. 447 LOG.warn("Directory " + uniquePath + " already exists, " + 448 "try next one."); 449 if (delService != null) { 450 delService.delete(getUser(), uniquePath); 451 } 452 } 453 454 Path localPath = new Path(rPath, req.getPath().getName()); 455 LocalizedResource rsrc = localrsrc.get(req); 456 rsrc.setLocalPath(localPath); 457 LocalResource lr = LocalResource.newInstance(req.getResource(), 458 req.getType(), req.getVisibility(), req.getSize(), 459 req.getTimestamp()); 460 try { 461 stateStore.startResourceLocalization(user, appId, 462 ((LocalResourcePBImpl) lr).getProto(), localPath); 463 } catch (IOException e) { 464 LOG.error("Unable to record localization start for " + rsrc, e); 465 } 466 return rPath; 467 } 468 469 @Override getLocalizedResource(LocalResourceRequest request)470 public LocalizedResource getLocalizedResource(LocalResourceRequest request) { 471 return localrsrc.get(request); 472 } 473 474 @VisibleForTesting getDirectoryManager(Path localDirPath)475 LocalCacheDirectoryManager getDirectoryManager(Path localDirPath) { 476 LocalCacheDirectoryManager mgr = null; 477 if (useLocalCacheDirectoryManager) { 478 mgr = directoryManagers.get(localDirPath); 479 } 480 return mgr; 481 } 482 } 483