1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache; 20 21 import java.io.IOException; 22 import java.io.InputStream; 23 import java.lang.reflect.UndeclaredThrowableException; 24 import java.net.URISyntaxException; 25 import java.util.Random; 26 import java.util.concurrent.Callable; 27 28 import org.apache.commons.logging.Log; 29 import org.apache.commons.logging.LogFactory; 30 import org.apache.hadoop.conf.Configuration; 31 import org.apache.hadoop.fs.FileStatus; 32 import org.apache.hadoop.fs.FileSystem; 33 import org.apache.hadoop.fs.FileUtil; 34 import org.apache.hadoop.fs.Path; 35 import org.apache.hadoop.fs.permission.FsPermission; 36 import org.apache.hadoop.yarn.api.records.LocalResource; 37 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; 38 import org.apache.hadoop.yarn.conf.YarnConfiguration; 39 import org.apache.hadoop.yarn.exceptions.YarnException; 40 import org.apache.hadoop.yarn.factories.RecordFactory; 41 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; 42 import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol; 43 import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest; 44 import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil; 45 import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum; 46 import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory; 47 import org.apache.hadoop.yarn.util.ConverterUtils; 48 import org.apache.hadoop.yarn.util.FSDownload; 49 50 import com.google.common.annotations.VisibleForTesting; 51 52 /** 53 * The callable class that handles the actual upload to the shared cache. 54 */ 55 class SharedCacheUploader implements Callable<Boolean> { 56 // rwxr-xr-x 57 static final FsPermission DIRECTORY_PERMISSION = 58 new FsPermission((short)00755); 59 // r-xr-xr-x 60 static final FsPermission FILE_PERMISSION = 61 new FsPermission((short)00555); 62 63 private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class); 64 private static final ThreadLocal<Random> randomTl = 65 new ThreadLocal<Random>() { 66 @Override 67 protected Random initialValue() { 68 return new Random(System.nanoTime()); 69 } 70 }; 71 72 private final LocalResource resource; 73 private final Path localPath; 74 private final String user; 75 private final Configuration conf; 76 private final SCMUploaderProtocol scmClient; 77 private final FileSystem fs; 78 private final FileSystem localFs; 79 private final String sharedCacheRootDir; 80 private final int nestedLevel; 81 private final SharedCacheChecksum checksum; 82 private final RecordFactory recordFactory; 83 SharedCacheUploader(LocalResource resource, Path localPath, String user, Configuration conf, SCMUploaderProtocol scmClient)84 public SharedCacheUploader(LocalResource resource, Path localPath, 85 String user, Configuration conf, SCMUploaderProtocol scmClient) 86 throws IOException { 87 this(resource, localPath, user, conf, scmClient, 88 FileSystem.get(conf), localPath.getFileSystem(conf)); 89 } 90 91 /** 92 * @param resource the local resource that contains the original remote path 93 * @param localPath the path in the local filesystem where the resource is 94 * localized 95 * @param fs the filesystem of the shared cache 96 * @param localFs the local filesystem 97 */ SharedCacheUploader(LocalResource resource, Path localPath, String user, Configuration conf, SCMUploaderProtocol scmClient, FileSystem fs, FileSystem localFs)98 public SharedCacheUploader(LocalResource resource, Path localPath, 99 String user, Configuration conf, SCMUploaderProtocol scmClient, 100 FileSystem fs, FileSystem localFs) { 101 this.resource = resource; 102 this.localPath = localPath; 103 this.user = user; 104 this.conf = conf; 105 this.scmClient = scmClient; 106 this.fs = fs; 107 this.sharedCacheRootDir = 108 conf.get(YarnConfiguration.SHARED_CACHE_ROOT, 109 YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT); 110 this.nestedLevel = SharedCacheUtil.getCacheDepth(conf); 111 this.checksum = SharedCacheChecksumFactory.getChecksum(conf); 112 this.localFs = localFs; 113 this.recordFactory = RecordFactoryProvider.getRecordFactory(null); 114 } 115 116 /** 117 * Uploads the file under the shared cache, and notifies the shared cache 118 * manager. If it is unable to upload the file because it already exists, it 119 * returns false. 120 */ 121 @Override call()122 public Boolean call() throws Exception { 123 Path tempPath = null; 124 try { 125 if (!verifyAccess()) { 126 LOG.warn("User " + user + " is not authorized to upload file " + 127 localPath.getName()); 128 return false; 129 } 130 131 // first determine the actual local path that will be used for upload 132 Path actualPath = getActualPath(); 133 // compute the checksum 134 String checksumVal = computeChecksum(actualPath); 135 // create the directory (if it doesn't exist) 136 Path directoryPath = 137 new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel, 138 sharedCacheRootDir, checksumVal)); 139 // let's not check if the directory already exists: in the vast majority 140 // of the cases, the directory does not exist; as long as mkdirs does not 141 // error out if it exists, we should be fine 142 fs.mkdirs(directoryPath, DIRECTORY_PERMISSION); 143 // create the temporary file 144 tempPath = new Path(directoryPath, getTemporaryFileName(actualPath)); 145 if (!uploadFile(actualPath, tempPath)) { 146 LOG.warn("Could not copy the file to the shared cache at " + tempPath); 147 return false; 148 } 149 150 // set the permission so that it is readable but not writable 151 fs.setPermission(tempPath, FILE_PERMISSION); 152 // rename it to the final filename 153 Path finalPath = new Path(directoryPath, actualPath.getName()); 154 if (!fs.rename(tempPath, finalPath)) { 155 LOG.warn("The file already exists under " + finalPath + 156 ". Ignoring this attempt."); 157 deleteTempFile(tempPath); 158 return false; 159 } 160 161 // notify the SCM 162 if (!notifySharedCacheManager(checksumVal, actualPath.getName())) { 163 // the shared cache manager rejected the upload (as it is likely 164 // uploaded under a different name 165 // clean up this file and exit 166 fs.delete(finalPath, false); 167 return false; 168 } 169 170 // set the replication factor 171 short replication = 172 (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR, 173 YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR); 174 fs.setReplication(finalPath, replication); 175 LOG.info("File " + actualPath.getName() + 176 " was uploaded to the shared cache at " + finalPath); 177 return true; 178 } catch (IOException e) { 179 LOG.warn("Exception while uploading the file " + localPath.getName(), e); 180 // in case an exception is thrown, delete the temp file 181 deleteTempFile(tempPath); 182 throw e; 183 } 184 } 185 186 @VisibleForTesting getActualPath()187 Path getActualPath() throws IOException { 188 Path path = localPath; 189 FileStatus status = localFs.getFileStatus(path); 190 if (status != null && status.isDirectory()) { 191 // for certain types of resources that get unpacked, the original file may 192 // be found under the directory with the same name (see 193 // FSDownload.unpack); check if the path is a directory and if so look 194 // under it 195 path = new Path(path, path.getName()); 196 } 197 return path; 198 } 199 deleteTempFile(Path tempPath)200 private void deleteTempFile(Path tempPath) { 201 try { 202 if (tempPath != null && fs.exists(tempPath)) { 203 fs.delete(tempPath, false); 204 } 205 } catch (IOException ignore) {} 206 } 207 208 /** 209 * Checks that the (original) remote file is either owned by the user who 210 * started the app or public. 211 */ 212 @VisibleForTesting verifyAccess()213 boolean verifyAccess() throws IOException { 214 // if it is in the public cache, it's trivially OK 215 if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) { 216 return true; 217 } 218 219 final Path remotePath; 220 try { 221 remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource()); 222 } catch (URISyntaxException e) { 223 throw new IOException("Invalid resource", e); 224 } 225 226 // get the file status of the HDFS file 227 FileSystem remoteFs = remotePath.getFileSystem(conf); 228 FileStatus status = remoteFs.getFileStatus(remotePath); 229 // check to see if the file has been modified in any way 230 if (status.getModificationTime() != resource.getTimestamp()) { 231 LOG.warn("The remote file " + remotePath + 232 " has changed since it's localized; will not consider it for upload"); 233 return false; 234 } 235 236 // check for the user ownership 237 if (status.getOwner().equals(user)) { 238 return true; // the user owns the file 239 } 240 // check if the file is publicly readable otherwise 241 return fileIsPublic(remotePath, remoteFs, status); 242 } 243 244 @VisibleForTesting fileIsPublic(final Path remotePath, FileSystem remoteFs, FileStatus status)245 boolean fileIsPublic(final Path remotePath, FileSystem remoteFs, 246 FileStatus status) throws IOException { 247 return FSDownload.isPublic(remoteFs, remotePath, status, null); 248 } 249 250 /** 251 * Uploads the file to the shared cache under a temporary name, and returns 252 * the result. 253 */ 254 @VisibleForTesting uploadFile(Path sourcePath, Path tempPath)255 boolean uploadFile(Path sourcePath, Path tempPath) throws IOException { 256 return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf); 257 } 258 259 @VisibleForTesting computeChecksum(Path path)260 String computeChecksum(Path path) throws IOException { 261 InputStream is = localFs.open(path); 262 try { 263 return checksum.computeChecksum(is); 264 } finally { 265 try { is.close(); } catch (IOException ignore) {} 266 } 267 } 268 getTemporaryFileName(Path path)269 private String getTemporaryFileName(Path path) { 270 return path.getName() + "-" + randomTl.get().nextLong(); 271 } 272 273 @VisibleForTesting notifySharedCacheManager(String checksumVal, String fileName)274 boolean notifySharedCacheManager(String checksumVal, String fileName) 275 throws IOException { 276 try { 277 SCMUploaderNotifyRequest request = 278 recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class); 279 request.setResourceKey(checksumVal); 280 request.setFilename(fileName); 281 return scmClient.notify(request).getAccepted(); 282 } catch (YarnException e) { 283 throw new IOException(e); 284 } catch (UndeclaredThrowableException e) { 285 // retrieve the cause of the exception and throw it as an IOException 286 throw new IOException(e.getCause() == null ? e : e.getCause()); 287 } 288 } 289 } 290