1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.hdfs.server.namenode.ha; 19 20 import static org.junit.Assert.assertEquals; 21 import static org.junit.Assert.assertFalse; 22 import static org.junit.Assert.fail; 23 24 import java.io.IOException; 25 import java.net.URISyntaxException; 26 27 import org.apache.hadoop.conf.Configuration; 28 import org.apache.hadoop.fs.FileSystem; 29 import org.apache.hadoop.fs.Path; 30 import org.apache.hadoop.ha.ServiceFailedException; 31 import org.apache.hadoop.hdfs.DFSClient; 32 import org.apache.hadoop.hdfs.DFSClientAdapter; 33 import org.apache.hadoop.hdfs.DFSConfigKeys; 34 import org.apache.hadoop.hdfs.DFSTestUtil; 35 import org.apache.hadoop.hdfs.DistributedFileSystem; 36 import org.apache.hadoop.hdfs.MiniDFSCluster; 37 import org.apache.hadoop.hdfs.MiniDFSNNTopology; 38 import org.apache.hadoop.hdfs.protocol.LocatedBlock; 39 import org.apache.hadoop.hdfs.protocol.LocatedBlocks; 40 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; 41 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; 42 import org.apache.hadoop.hdfs.server.datanode.DataNode; 43 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; 44 import org.apache.hadoop.security.token.Token; 45 import org.apache.hadoop.test.GenericTestUtils; 46 import org.apache.hadoop.util.Time; 47 import org.junit.After; 48 import org.junit.Before; 49 import org.junit.Test; 50 import org.mockito.Mockito; 51 import org.mockito.invocation.InvocationOnMock; 52 import org.mockito.stubbing.Answer; 53 54 public class TestFailoverWithBlockTokensEnabled { 55 56 private static final Path TEST_PATH = new Path("/test-path"); 57 private static final String TEST_DATA = "very important text"; 58 59 private Configuration conf; 60 private MiniDFSCluster cluster; 61 62 @Before startCluster()63 public void startCluster() throws IOException { 64 conf = new Configuration(); 65 conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); 66 // Set short retry timeouts so this test runs faster 67 conf.setInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, 10); 68 cluster = new MiniDFSCluster.Builder(conf) 69 .nnTopology(MiniDFSNNTopology.simpleHATopology()) 70 .numDataNodes(1) 71 .build(); 72 } 73 74 @After shutDownCluster()75 public void shutDownCluster() { 76 if (cluster != null) { 77 cluster.shutdown(); 78 } 79 } 80 81 @Test ensureSerialNumbersNeverOverlap()82 public void ensureSerialNumbersNeverOverlap() { 83 BlockTokenSecretManager btsm1 = cluster.getNamesystem(0).getBlockManager() 84 .getBlockTokenSecretManager(); 85 BlockTokenSecretManager btsm2 = cluster.getNamesystem(1).getBlockManager() 86 .getBlockTokenSecretManager(); 87 88 btsm1.setSerialNo(0); 89 btsm2.setSerialNo(0); 90 assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); 91 92 btsm1.setSerialNo(Integer.MAX_VALUE); 93 btsm2.setSerialNo(Integer.MAX_VALUE); 94 assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); 95 96 btsm1.setSerialNo(Integer.MIN_VALUE); 97 btsm2.setSerialNo(Integer.MIN_VALUE); 98 assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); 99 100 btsm1.setSerialNo(Integer.MAX_VALUE / 2); 101 btsm2.setSerialNo(Integer.MAX_VALUE / 2); 102 assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); 103 104 btsm1.setSerialNo(Integer.MIN_VALUE / 2); 105 btsm2.setSerialNo(Integer.MIN_VALUE / 2); 106 assertFalse(btsm1.getSerialNoForTesting() == btsm2.getSerialNoForTesting()); 107 } 108 109 @Test ensureInvalidBlockTokensAreRejected()110 public void ensureInvalidBlockTokensAreRejected() throws IOException, 111 URISyntaxException { 112 cluster.transitionToActive(0); 113 FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); 114 115 DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); 116 assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); 117 118 DFSClient dfsClient = DFSClientAdapter.getDFSClient((DistributedFileSystem) fs); 119 DFSClient spyDfsClient = Mockito.spy(dfsClient); 120 Mockito.doAnswer( 121 new Answer<LocatedBlocks>() { 122 @Override 123 public LocatedBlocks answer(InvocationOnMock arg0) throws Throwable { 124 LocatedBlocks locatedBlocks = (LocatedBlocks)arg0.callRealMethod(); 125 for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { 126 Token<BlockTokenIdentifier> token = lb.getBlockToken(); 127 BlockTokenIdentifier id = lb.getBlockToken().decodeIdentifier(); 128 // This will make the token invalid, since the password 129 // won't match anymore 130 id.setExpiryDate(Time.now() + 10); 131 Token<BlockTokenIdentifier> newToken = 132 new Token<BlockTokenIdentifier>(id.getBytes(), 133 token.getPassword(), token.getKind(), token.getService()); 134 lb.setBlockToken(newToken); 135 } 136 return locatedBlocks; 137 } 138 }).when(spyDfsClient).getLocatedBlocks(Mockito.anyString(), 139 Mockito.anyLong(), Mockito.anyLong()); 140 DFSClientAdapter.setDFSClient((DistributedFileSystem)fs, spyDfsClient); 141 142 try { 143 assertEquals(TEST_DATA, DFSTestUtil.readFile(fs, TEST_PATH)); 144 fail("Shouldn't have been able to read a file with invalid block tokens"); 145 } catch (IOException ioe) { 146 GenericTestUtils.assertExceptionContains("Could not obtain block", ioe); 147 } 148 } 149 150 @Test testFailoverAfterRegistration()151 public void testFailoverAfterRegistration() throws IOException, 152 URISyntaxException { 153 writeUsingBothNameNodes(); 154 } 155 156 @Test TestFailoverAfterAccessKeyUpdate()157 public void TestFailoverAfterAccessKeyUpdate() throws IOException, 158 URISyntaxException, InterruptedException { 159 lowerKeyUpdateIntervalAndClearKeys(cluster); 160 // Sleep 10s to guarantee DNs heartbeat and get new keys. 161 Thread.sleep(10 * 1000); 162 writeUsingBothNameNodes(); 163 } 164 writeUsingBothNameNodes()165 private void writeUsingBothNameNodes() throws ServiceFailedException, 166 IOException, URISyntaxException { 167 cluster.transitionToActive(0); 168 169 FileSystem fs = HATestUtil.configureFailoverFs(cluster, conf); 170 DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); 171 172 cluster.transitionToStandby(0); 173 cluster.transitionToActive(1); 174 175 fs.delete(TEST_PATH, false); 176 DFSTestUtil.writeFile(fs, TEST_PATH, TEST_DATA); 177 } 178 lowerKeyUpdateIntervalAndClearKeys(MiniDFSCluster cluster)179 private static void lowerKeyUpdateIntervalAndClearKeys(MiniDFSCluster cluster) { 180 lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(0)); 181 lowerKeyUpdateIntervalAndClearKeys(cluster.getNamesystem(1)); 182 for (DataNode dn : cluster.getDataNodes()) { 183 dn.clearAllBlockSecretKeys(); 184 } 185 } 186 lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem)187 private static void lowerKeyUpdateIntervalAndClearKeys(FSNamesystem namesystem) { 188 BlockTokenSecretManager btsm = namesystem.getBlockManager() 189 .getBlockTokenSecretManager(); 190 btsm.setKeyUpdateIntervalForTesting(2 * 1000); 191 btsm.setTokenLifetime(2 * 1000); 192 btsm.clearAllKeysForTesting(); 193 } 194 195 } 196