1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.hdfs;
19 
20 import java.io.IOException;
21 import java.util.Arrays;
22 
23 import org.apache.commons.logging.Log;
24 import org.apache.commons.logging.impl.Log4JLogger;
25 import org.apache.hadoop.conf.Configuration;
26 import org.apache.hadoop.fs.FSDataInputStream;
27 import org.apache.hadoop.fs.FSDataOutputStream;
28 import org.apache.hadoop.fs.FileStatus;
29 import org.apache.hadoop.fs.Path;
30 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
31 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
32 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
33 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
34 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
35 import org.apache.hadoop.io.IOUtils;
36 import org.apache.log4j.Level;
37 import org.junit.Assert;
38 import org.junit.Test;
39 
40 /**
41  * This class tests that a file need not be closed before its
42  * data can be read by another client.
43  */
44 public class TestReplaceDatanodeOnFailure {
45   static final Log LOG = AppendTestUtil.LOG;
46 
47   static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/";
48   static final short REPLICATION = 3;
49   final private static String RACK0 = "/rack0";
50   final private static String RACK1 = "/rack1";
51 
52   {
53     ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
54   }
55 
56   /** Test DEFAULT ReplaceDatanodeOnFailure policy. */
57   @Test
testDefaultPolicy()58   public void testDefaultPolicy() throws Exception {
59     final Configuration conf = new HdfsConfiguration();
60     final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf);
61 
62     final DatanodeInfo[] infos = new DatanodeInfo[5];
63     final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
64     datanodes[0] = new DatanodeInfo[0];
65     for(int i = 0; i < infos.length; ) {
66       infos[i] = DFSTestUtil.getLocalDatanodeInfo(50020 + i);
67       i++;
68       datanodes[i] = new DatanodeInfo[i];
69       System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length);
70     }
71 
72     final boolean[] isAppend   = {true, true, false, false};
73     final boolean[] isHflushed = {true, false, true, false};
74 
75     for(short replication = 1; replication <= infos.length; replication++) {
76       for(int nExistings = 0; nExistings < datanodes.length; nExistings++) {
77         final DatanodeInfo[] existings = datanodes[nExistings];
78         Assert.assertEquals(nExistings, existings.length);
79 
80         for(int i = 0; i < isAppend.length; i++) {
81           for(int j = 0; j < isHflushed.length; j++) {
82             final int half = replication/2;
83             final boolean enoughReplica = replication <= nExistings;
84             final boolean noReplica = nExistings == 0;
85             final boolean replicationL3 = replication < 3;
86             final boolean existingsLEhalf = nExistings <= half;
87             final boolean isAH = isAppend[i] || isHflushed[j];
88 
89             final boolean expected;
90             if (enoughReplica || noReplica || replicationL3) {
91               expected = false;
92             } else {
93               expected = isAH || existingsLEhalf;
94             }
95 
96             final boolean computed = p.satisfy(
97                 replication, existings, isAppend[i], isHflushed[j]);
98             try {
99               Assert.assertEquals(expected, computed);
100             } catch(AssertionError e) {
101               final String s = "replication=" + replication
102                            + "\nnExistings =" + nExistings
103                            + "\nisAppend   =" + isAppend[i]
104                            + "\nisHflushed =" + isHflushed[j];
105               throw new RuntimeException(s, e);
106             }
107           }
108         }
109       }
110     }
111   }
112 
113   /** Test replace datanode on failure. */
114   @Test
115   public void testReplaceDatanodeOnFailure() throws Exception {
116     final Configuration conf = new HdfsConfiguration();
117 
118     //always replace a datanode
119     ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
120 
121     final String[] racks = new String[REPLICATION];
122     Arrays.fill(racks, RACK0);
123     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
124         ).racks(racks).numDataNodes(REPLICATION).build();
125 
126     try {
127       final DistributedFileSystem fs = cluster.getFileSystem();
128       final Path dir = new Path(DIR);
129 
130       final SlowWriter[] slowwriters = new SlowWriter[10];
131       for(int i = 1; i <= slowwriters.length; i++) {
132         //create slow writers in different speed
133         slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L);
134       }
135 
136       for(SlowWriter s : slowwriters) {
137         s.start();
138       }
139 
140       // Let slow writers write something.
141       // Some of them are too slow and will be not yet started.
142       sleepSeconds(1);
143 
144       //start new datanodes
145       cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1});
146       //stop an old datanode
147       cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION));
148 
149       //Let the slow writer writes a few more seconds
150       //Everyone should have written something.
151       sleepSeconds(5);
152 
153       //check replication and interrupt.
154       for(SlowWriter s : slowwriters) {
155         s.checkReplication();
156         s.interruptRunning();
157       }
158 
159       //close files
160       for(SlowWriter s : slowwriters) {
161         s.joinAndClose();
162       }
163 
164       //Verify the file
165       LOG.info("Verify the file");
166       for(int i = 0; i < slowwriters.length; i++) {
167         LOG.info(slowwriters[i].filepath + ": length="
168             + fs.getFileStatus(slowwriters[i].filepath).getLen());
169         FSDataInputStream in = null;
170         try {
171           in = fs.open(slowwriters[i].filepath);
172           for(int j = 0, x; (x = in.read()) != -1; j++) {
173             Assert.assertEquals(j, x);
174           }
175         }
176         finally {
177           IOUtils.closeStream(in);
178         }
179       }
180     } finally {
181       if (cluster != null) {cluster.shutdown();}
182     }
183   }
184 
185   static void sleepSeconds(final int waittime) throws InterruptedException {
186     LOG.info("Wait " + waittime + " seconds");
187     Thread.sleep(waittime * 1000L);
188   }
189 
190   static class SlowWriter extends Thread {
191     final Path filepath;
192     final HdfsDataOutputStream out;
193     final long sleepms;
194     private volatile boolean running = true;
195 
196     SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms
197         ) throws IOException {
198       super(SlowWriter.class.getSimpleName() + ":" + filepath);
199       this.filepath = filepath;
200       this.out = (HdfsDataOutputStream)fs.create(filepath, REPLICATION);
201       this.sleepms = sleepms;
202     }
203 
204     @Override
205     public void run() {
206       int i = 0;
207       try {
208         sleep(sleepms);
209         for(; running; i++) {
210           LOG.info(getName() + " writes " + i);
211           out.write(i);
212           out.hflush();
213           sleep(sleepms);
214         }
215       } catch(InterruptedException e) {
216         LOG.info(getName() + " interrupted:" + e);
217       } catch(IOException e) {
218         throw new RuntimeException(getName(), e);
219       } finally {
220         LOG.info(getName() + " terminated: i=" + i);
221       }
222     }
223 
224     void interruptRunning() {
225       running = false;
226       interrupt();
227     }
228 
229     void joinAndClose() throws InterruptedException {
230       LOG.info(getName() + " join and close");
231       join();
232       IOUtils.closeStream(out);
233     }
234 
235     void checkReplication() throws IOException {
236       Assert.assertEquals(REPLICATION, out.getCurrentBlockReplication());
237     }
238   }
239 
240   @Test
241   public void testAppend() throws Exception {
242     final Configuration conf = new HdfsConfiguration();
243     final short REPLICATION = (short)3;
244 
245     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
246         ).numDataNodes(1).build();
247 
248     try {
249       final DistributedFileSystem fs = cluster.getFileSystem();
250       final Path f = new Path(DIR, "testAppend");
251 
252       {
253         LOG.info("create an empty file " + f);
254         fs.create(f, REPLICATION).close();
255         final FileStatus status = fs.getFileStatus(f);
256         Assert.assertEquals(REPLICATION, status.getReplication());
257         Assert.assertEquals(0L, status.getLen());
258       }
259 
260 
261       final byte[] bytes = new byte[1000];
262       {
263         LOG.info("append " + bytes.length + " bytes to " + f);
264         final FSDataOutputStream out = fs.append(f);
265         out.write(bytes);
266         out.close();
267 
268         final FileStatus status = fs.getFileStatus(f);
269         Assert.assertEquals(REPLICATION, status.getReplication());
270         Assert.assertEquals(bytes.length, status.getLen());
271       }
272 
273       {
274         LOG.info("append another " + bytes.length + " bytes to " + f);
275         try {
276           final FSDataOutputStream out = fs.append(f);
277           out.write(bytes);
278           out.close();
279 
280           Assert.fail();
281         } catch(IOException ioe) {
282           LOG.info("This exception is expected", ioe);
283         }
284       }
285     } finally {
286       if (cluster != null) {cluster.shutdown();}
287     }
288   }
289 
290   @Test
291   public void testBestEffort() throws Exception {
292     final Configuration conf = new HdfsConfiguration();
293 
294     //always replace a datanode but do not throw exception
295     ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf);
296 
297     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
298         ).numDataNodes(1).build();
299 
300     try {
301       final DistributedFileSystem fs = cluster.getFileSystem();
302       final Path f = new Path(DIR, "testIgnoreReplaceFailure");
303 
304       final byte[] bytes = new byte[1000];
305       {
306         LOG.info("write " + bytes.length + " bytes to " + f);
307         final FSDataOutputStream out = fs.create(f, REPLICATION);
308         out.write(bytes);
309         out.close();
310 
311         final FileStatus status = fs.getFileStatus(f);
312         Assert.assertEquals(REPLICATION, status.getReplication());
313         Assert.assertEquals(bytes.length, status.getLen());
314       }
315 
316       {
317         LOG.info("append another " + bytes.length + " bytes to " + f);
318         final FSDataOutputStream out = fs.append(f);
319         out.write(bytes);
320         out.close();
321       }
322     } finally {
323       if (cluster != null) {cluster.shutdown();}
324     }
325   }
326 }
327