1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache;
20 
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.lang.reflect.UndeclaredThrowableException;
24 import java.net.URISyntaxException;
25 import java.util.Random;
26 import java.util.concurrent.Callable;
27 
28 import org.apache.commons.logging.Log;
29 import org.apache.commons.logging.LogFactory;
30 import org.apache.hadoop.conf.Configuration;
31 import org.apache.hadoop.fs.FileStatus;
32 import org.apache.hadoop.fs.FileSystem;
33 import org.apache.hadoop.fs.FileUtil;
34 import org.apache.hadoop.fs.Path;
35 import org.apache.hadoop.fs.permission.FsPermission;
36 import org.apache.hadoop.yarn.api.records.LocalResource;
37 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
38 import org.apache.hadoop.yarn.conf.YarnConfiguration;
39 import org.apache.hadoop.yarn.exceptions.YarnException;
40 import org.apache.hadoop.yarn.factories.RecordFactory;
41 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
42 import org.apache.hadoop.yarn.server.api.SCMUploaderProtocol;
43 import org.apache.hadoop.yarn.server.api.protocolrecords.SCMUploaderNotifyRequest;
44 import org.apache.hadoop.yarn.server.sharedcache.SharedCacheUtil;
45 import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksum;
46 import org.apache.hadoop.yarn.sharedcache.SharedCacheChecksumFactory;
47 import org.apache.hadoop.yarn.util.ConverterUtils;
48 import org.apache.hadoop.yarn.util.FSDownload;
49 
50 import com.google.common.annotations.VisibleForTesting;
51 
52 /**
53  * The callable class that handles the actual upload to the shared cache.
54  */
55 class SharedCacheUploader implements Callable<Boolean> {
56   // rwxr-xr-x
57   static final FsPermission DIRECTORY_PERMISSION =
58       new FsPermission((short)00755);
59   // r-xr-xr-x
60   static final FsPermission FILE_PERMISSION =
61       new FsPermission((short)00555);
62 
63   private static final Log LOG = LogFactory.getLog(SharedCacheUploader.class);
64   private static final ThreadLocal<Random> randomTl =
65       new ThreadLocal<Random>() {
66         @Override
67         protected Random initialValue() {
68           return new Random(System.nanoTime());
69         }
70       };
71 
72   private final LocalResource resource;
73   private final Path localPath;
74   private final String user;
75   private final Configuration conf;
76   private final SCMUploaderProtocol scmClient;
77   private final FileSystem fs;
78   private final FileSystem localFs;
79   private final String sharedCacheRootDir;
80   private final int nestedLevel;
81   private final SharedCacheChecksum checksum;
82   private final RecordFactory recordFactory;
83 
SharedCacheUploader(LocalResource resource, Path localPath, String user, Configuration conf, SCMUploaderProtocol scmClient)84   public SharedCacheUploader(LocalResource resource, Path localPath,
85       String user, Configuration conf, SCMUploaderProtocol scmClient)
86           throws IOException {
87     this(resource, localPath, user, conf, scmClient,
88         FileSystem.get(conf), localPath.getFileSystem(conf));
89   }
90 
91   /**
92    * @param resource the local resource that contains the original remote path
93    * @param localPath the path in the local filesystem where the resource is
94    * localized
95    * @param fs the filesystem of the shared cache
96    * @param localFs the local filesystem
97    */
SharedCacheUploader(LocalResource resource, Path localPath, String user, Configuration conf, SCMUploaderProtocol scmClient, FileSystem fs, FileSystem localFs)98   public SharedCacheUploader(LocalResource resource, Path localPath,
99       String user, Configuration conf, SCMUploaderProtocol scmClient,
100       FileSystem fs, FileSystem localFs) {
101     this.resource = resource;
102     this.localPath = localPath;
103     this.user = user;
104     this.conf = conf;
105     this.scmClient = scmClient;
106     this.fs = fs;
107     this.sharedCacheRootDir =
108         conf.get(YarnConfiguration.SHARED_CACHE_ROOT,
109             YarnConfiguration.DEFAULT_SHARED_CACHE_ROOT);
110     this.nestedLevel = SharedCacheUtil.getCacheDepth(conf);
111     this.checksum = SharedCacheChecksumFactory.getChecksum(conf);
112     this.localFs = localFs;
113     this.recordFactory = RecordFactoryProvider.getRecordFactory(null);
114   }
115 
116   /**
117    * Uploads the file under the shared cache, and notifies the shared cache
118    * manager. If it is unable to upload the file because it already exists, it
119    * returns false.
120    */
121   @Override
call()122   public Boolean call() throws Exception {
123     Path tempPath = null;
124     try {
125       if (!verifyAccess()) {
126         LOG.warn("User " + user + " is not authorized to upload file " +
127             localPath.getName());
128         return false;
129       }
130 
131       // first determine the actual local path that will be used for upload
132       Path actualPath = getActualPath();
133       // compute the checksum
134       String checksumVal = computeChecksum(actualPath);
135       // create the directory (if it doesn't exist)
136       Path directoryPath =
137           new Path(SharedCacheUtil.getCacheEntryPath(nestedLevel,
138               sharedCacheRootDir, checksumVal));
139       // let's not check if the directory already exists: in the vast majority
140       // of the cases, the directory does not exist; as long as mkdirs does not
141       // error out if it exists, we should be fine
142       fs.mkdirs(directoryPath, DIRECTORY_PERMISSION);
143       // create the temporary file
144       tempPath = new Path(directoryPath, getTemporaryFileName(actualPath));
145       if (!uploadFile(actualPath, tempPath)) {
146         LOG.warn("Could not copy the file to the shared cache at " + tempPath);
147         return false;
148       }
149 
150       // set the permission so that it is readable but not writable
151       fs.setPermission(tempPath, FILE_PERMISSION);
152       // rename it to the final filename
153       Path finalPath = new Path(directoryPath, actualPath.getName());
154       if (!fs.rename(tempPath, finalPath)) {
155         LOG.warn("The file already exists under " + finalPath +
156             ". Ignoring this attempt.");
157         deleteTempFile(tempPath);
158         return false;
159       }
160 
161       // notify the SCM
162       if (!notifySharedCacheManager(checksumVal, actualPath.getName())) {
163         // the shared cache manager rejected the upload (as it is likely
164         // uploaded under a different name
165         // clean up this file and exit
166         fs.delete(finalPath, false);
167         return false;
168       }
169 
170       // set the replication factor
171       short replication =
172           (short)conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR,
173               YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_REPLICATION_FACTOR);
174       fs.setReplication(finalPath, replication);
175       LOG.info("File " + actualPath.getName() +
176           " was uploaded to the shared cache at " + finalPath);
177       return true;
178     } catch (IOException e) {
179       LOG.warn("Exception while uploading the file " + localPath.getName(), e);
180       // in case an exception is thrown, delete the temp file
181       deleteTempFile(tempPath);
182       throw e;
183     }
184   }
185 
186   @VisibleForTesting
getActualPath()187   Path getActualPath() throws IOException {
188     Path path = localPath;
189     FileStatus status = localFs.getFileStatus(path);
190     if (status != null && status.isDirectory()) {
191       // for certain types of resources that get unpacked, the original file may
192       // be found under the directory with the same name (see
193       // FSDownload.unpack); check if the path is a directory and if so look
194       // under it
195       path = new Path(path, path.getName());
196     }
197     return path;
198   }
199 
deleteTempFile(Path tempPath)200   private void deleteTempFile(Path tempPath) {
201     try {
202       if (tempPath != null && fs.exists(tempPath)) {
203         fs.delete(tempPath, false);
204       }
205     } catch (IOException ignore) {}
206   }
207 
208   /**
209    * Checks that the (original) remote file is either owned by the user who
210    * started the app or public.
211    */
212   @VisibleForTesting
verifyAccess()213   boolean verifyAccess() throws IOException {
214     // if it is in the public cache, it's trivially OK
215     if (resource.getVisibility() == LocalResourceVisibility.PUBLIC) {
216       return true;
217     }
218 
219     final Path remotePath;
220     try {
221       remotePath = ConverterUtils.getPathFromYarnURL(resource.getResource());
222     } catch (URISyntaxException e) {
223       throw new IOException("Invalid resource", e);
224     }
225 
226     // get the file status of the HDFS file
227     FileSystem remoteFs = remotePath.getFileSystem(conf);
228     FileStatus status = remoteFs.getFileStatus(remotePath);
229     // check to see if the file has been modified in any way
230     if (status.getModificationTime() != resource.getTimestamp()) {
231       LOG.warn("The remote file " + remotePath +
232           " has changed since it's localized; will not consider it for upload");
233       return false;
234     }
235 
236     // check for the user ownership
237     if (status.getOwner().equals(user)) {
238       return true; // the user owns the file
239     }
240     // check if the file is publicly readable otherwise
241     return fileIsPublic(remotePath, remoteFs, status);
242   }
243 
244   @VisibleForTesting
fileIsPublic(final Path remotePath, FileSystem remoteFs, FileStatus status)245   boolean fileIsPublic(final Path remotePath, FileSystem remoteFs,
246       FileStatus status) throws IOException {
247     return FSDownload.isPublic(remoteFs, remotePath, status, null);
248   }
249 
250   /**
251    * Uploads the file to the shared cache under a temporary name, and returns
252    * the result.
253    */
254   @VisibleForTesting
uploadFile(Path sourcePath, Path tempPath)255   boolean uploadFile(Path sourcePath, Path tempPath) throws IOException {
256     return FileUtil.copy(localFs, sourcePath, fs, tempPath, false, conf);
257   }
258 
259   @VisibleForTesting
computeChecksum(Path path)260   String computeChecksum(Path path) throws IOException {
261     InputStream is = localFs.open(path);
262     try {
263       return checksum.computeChecksum(is);
264     } finally {
265       try { is.close(); } catch (IOException ignore) {}
266     }
267   }
268 
getTemporaryFileName(Path path)269   private String getTemporaryFileName(Path path) {
270     return path.getName() + "-" + randomTl.get().nextLong();
271   }
272 
273   @VisibleForTesting
notifySharedCacheManager(String checksumVal, String fileName)274   boolean notifySharedCacheManager(String checksumVal, String fileName)
275       throws IOException {
276     try {
277       SCMUploaderNotifyRequest request =
278           recordFactory.newRecordInstance(SCMUploaderNotifyRequest.class);
279       request.setResourceKey(checksumVal);
280       request.setFilename(fileName);
281       return scmClient.notify(request).getAccepted();
282     } catch (YarnException e) {
283       throw new IOException(e);
284     } catch (UndeclaredThrowableException e) {
285       // retrieve the cause of the exception and throw it as an IOException
286       throw new IOException(e.getCause() == null ? e : e.getCause());
287     }
288   }
289 }
290