1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs; 19 20 import java.io.IOException; 21 import java.util.Arrays; 22 23 import org.apache.commons.logging.Log; 24 import org.apache.commons.logging.impl.Log4JLogger; 25 import org.apache.hadoop.conf.Configuration; 26 import org.apache.hadoop.fs.FSDataInputStream; 27 import org.apache.hadoop.fs.FSDataOutputStream; 28 import org.apache.hadoop.fs.FileStatus; 29 import org.apache.hadoop.fs.Path; 30 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; 31 import org.apache.hadoop.hdfs.protocol.DatanodeInfo; 32 import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; 33 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure; 34 import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy; 35 import org.apache.hadoop.io.IOUtils; 36 import org.apache.log4j.Level; 37 import org.junit.Assert; 38 import org.junit.Test; 39 40 /** 41 * This class tests that a file need not be closed before its 42 * data can be read by another client. 43 */ 44 public class TestReplaceDatanodeOnFailure { 45 static final Log LOG = AppendTestUtil.LOG; 46 47 static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/"; 48 static final short REPLICATION = 3; 49 final private static String RACK0 = "/rack0"; 50 final private static String RACK1 = "/rack1"; 51 52 { 53 ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL); 54 } 55 56 /** Test DEFAULT ReplaceDatanodeOnFailure policy. */ 57 @Test testDefaultPolicy()58 public void testDefaultPolicy() throws Exception { 59 final Configuration conf = new HdfsConfiguration(); 60 final ReplaceDatanodeOnFailure p = ReplaceDatanodeOnFailure.get(conf); 61 62 final DatanodeInfo[] infos = new DatanodeInfo[5]; 63 final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][]; 64 datanodes[0] = new DatanodeInfo[0]; 65 for(int i = 0; i < infos.length; ) { 66 infos[i] = DFSTestUtil.getLocalDatanodeInfo(50020 + i); 67 i++; 68 datanodes[i] = new DatanodeInfo[i]; 69 System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length); 70 } 71 72 final boolean[] isAppend = {true, true, false, false}; 73 final boolean[] isHflushed = {true, false, true, false}; 74 75 for(short replication = 1; replication <= infos.length; replication++) { 76 for(int nExistings = 0; nExistings < datanodes.length; nExistings++) { 77 final DatanodeInfo[] existings = datanodes[nExistings]; 78 Assert.assertEquals(nExistings, existings.length); 79 80 for(int i = 0; i < isAppend.length; i++) { 81 for(int j = 0; j < isHflushed.length; j++) { 82 final int half = replication/2; 83 final boolean enoughReplica = replication <= nExistings; 84 final boolean noReplica = nExistings == 0; 85 final boolean replicationL3 = replication < 3; 86 final boolean existingsLEhalf = nExistings <= half; 87 final boolean isAH = isAppend[i] || isHflushed[j]; 88 89 final boolean expected; 90 if (enoughReplica || noReplica || replicationL3) { 91 expected = false; 92 } else { 93 expected = isAH || existingsLEhalf; 94 } 95 96 final boolean computed = p.satisfy( 97 replication, existings, isAppend[i], isHflushed[j]); 98 try { 99 Assert.assertEquals(expected, computed); 100 } catch(AssertionError e) { 101 final String s = "replication=" + replication 102 + "\nnExistings =" + nExistings 103 + "\nisAppend =" + isAppend[i] 104 + "\nisHflushed =" + isHflushed[j]; 105 throw new RuntimeException(s, e); 106 } 107 } 108 } 109 } 110 } 111 } 112 113 /** Test replace datanode on failure. */ 114 @Test 115 public void testReplaceDatanodeOnFailure() throws Exception { 116 final Configuration conf = new HdfsConfiguration(); 117 118 //always replace a datanode 119 ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf); 120 121 final String[] racks = new String[REPLICATION]; 122 Arrays.fill(racks, RACK0); 123 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf 124 ).racks(racks).numDataNodes(REPLICATION).build(); 125 126 try { 127 final DistributedFileSystem fs = cluster.getFileSystem(); 128 final Path dir = new Path(DIR); 129 130 final SlowWriter[] slowwriters = new SlowWriter[10]; 131 for(int i = 1; i <= slowwriters.length; i++) { 132 //create slow writers in different speed 133 slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L); 134 } 135 136 for(SlowWriter s : slowwriters) { 137 s.start(); 138 } 139 140 // Let slow writers write something. 141 // Some of them are too slow and will be not yet started. 142 sleepSeconds(1); 143 144 //start new datanodes 145 cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1}); 146 //stop an old datanode 147 cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION)); 148 149 //Let the slow writer writes a few more seconds 150 //Everyone should have written something. 151 sleepSeconds(5); 152 153 //check replication and interrupt. 154 for(SlowWriter s : slowwriters) { 155 s.checkReplication(); 156 s.interruptRunning(); 157 } 158 159 //close files 160 for(SlowWriter s : slowwriters) { 161 s.joinAndClose(); 162 } 163 164 //Verify the file 165 LOG.info("Verify the file"); 166 for(int i = 0; i < slowwriters.length; i++) { 167 LOG.info(slowwriters[i].filepath + ": length=" 168 + fs.getFileStatus(slowwriters[i].filepath).getLen()); 169 FSDataInputStream in = null; 170 try { 171 in = fs.open(slowwriters[i].filepath); 172 for(int j = 0, x; (x = in.read()) != -1; j++) { 173 Assert.assertEquals(j, x); 174 } 175 } 176 finally { 177 IOUtils.closeStream(in); 178 } 179 } 180 } finally { 181 if (cluster != null) {cluster.shutdown();} 182 } 183 } 184 185 static void sleepSeconds(final int waittime) throws InterruptedException { 186 LOG.info("Wait " + waittime + " seconds"); 187 Thread.sleep(waittime * 1000L); 188 } 189 190 static class SlowWriter extends Thread { 191 final Path filepath; 192 final HdfsDataOutputStream out; 193 final long sleepms; 194 private volatile boolean running = true; 195 196 SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms 197 ) throws IOException { 198 super(SlowWriter.class.getSimpleName() + ":" + filepath); 199 this.filepath = filepath; 200 this.out = (HdfsDataOutputStream)fs.create(filepath, REPLICATION); 201 this.sleepms = sleepms; 202 } 203 204 @Override 205 public void run() { 206 int i = 0; 207 try { 208 sleep(sleepms); 209 for(; running; i++) { 210 LOG.info(getName() + " writes " + i); 211 out.write(i); 212 out.hflush(); 213 sleep(sleepms); 214 } 215 } catch(InterruptedException e) { 216 LOG.info(getName() + " interrupted:" + e); 217 } catch(IOException e) { 218 throw new RuntimeException(getName(), e); 219 } finally { 220 LOG.info(getName() + " terminated: i=" + i); 221 } 222 } 223 224 void interruptRunning() { 225 running = false; 226 interrupt(); 227 } 228 229 void joinAndClose() throws InterruptedException { 230 LOG.info(getName() + " join and close"); 231 join(); 232 IOUtils.closeStream(out); 233 } 234 235 void checkReplication() throws IOException { 236 Assert.assertEquals(REPLICATION, out.getCurrentBlockReplication()); 237 } 238 } 239 240 @Test 241 public void testAppend() throws Exception { 242 final Configuration conf = new HdfsConfiguration(); 243 final short REPLICATION = (short)3; 244 245 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf 246 ).numDataNodes(1).build(); 247 248 try { 249 final DistributedFileSystem fs = cluster.getFileSystem(); 250 final Path f = new Path(DIR, "testAppend"); 251 252 { 253 LOG.info("create an empty file " + f); 254 fs.create(f, REPLICATION).close(); 255 final FileStatus status = fs.getFileStatus(f); 256 Assert.assertEquals(REPLICATION, status.getReplication()); 257 Assert.assertEquals(0L, status.getLen()); 258 } 259 260 261 final byte[] bytes = new byte[1000]; 262 { 263 LOG.info("append " + bytes.length + " bytes to " + f); 264 final FSDataOutputStream out = fs.append(f); 265 out.write(bytes); 266 out.close(); 267 268 final FileStatus status = fs.getFileStatus(f); 269 Assert.assertEquals(REPLICATION, status.getReplication()); 270 Assert.assertEquals(bytes.length, status.getLen()); 271 } 272 273 { 274 LOG.info("append another " + bytes.length + " bytes to " + f); 275 try { 276 final FSDataOutputStream out = fs.append(f); 277 out.write(bytes); 278 out.close(); 279 280 Assert.fail(); 281 } catch(IOException ioe) { 282 LOG.info("This exception is expected", ioe); 283 } 284 } 285 } finally { 286 if (cluster != null) {cluster.shutdown();} 287 } 288 } 289 290 @Test 291 public void testBestEffort() throws Exception { 292 final Configuration conf = new HdfsConfiguration(); 293 294 //always replace a datanode but do not throw exception 295 ReplaceDatanodeOnFailure.write(Policy.ALWAYS, true, conf); 296 297 final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf 298 ).numDataNodes(1).build(); 299 300 try { 301 final DistributedFileSystem fs = cluster.getFileSystem(); 302 final Path f = new Path(DIR, "testIgnoreReplaceFailure"); 303 304 final byte[] bytes = new byte[1000]; 305 { 306 LOG.info("write " + bytes.length + " bytes to " + f); 307 final FSDataOutputStream out = fs.create(f, REPLICATION); 308 out.write(bytes); 309 out.close(); 310 311 final FileStatus status = fs.getFileStatus(f); 312 Assert.assertEquals(REPLICATION, status.getReplication()); 313 Assert.assertEquals(bytes.length, status.getLen()); 314 } 315 316 { 317 LOG.info("append another " + bytes.length + " bytes to " + f); 318 final FSDataOutputStream out = fs.append(f); 319 out.write(bytes); 320 out.close(); 321 } 322 } finally { 323 if (cluster != null) {cluster.shutdown();} 324 } 325 } 326 } 327