1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.tracing; 19 20 import org.apache.commons.lang.RandomStringUtils; 21 import org.apache.hadoop.conf.Configuration; 22 import org.apache.hadoop.fs.FSDataInputStream; 23 import org.apache.hadoop.fs.FSDataOutputStream; 24 import org.apache.hadoop.fs.Path; 25 import org.apache.hadoop.hdfs.DFSConfigKeys; 26 import org.apache.hadoop.hdfs.DistributedFileSystem; 27 import org.apache.hadoop.hdfs.MiniDFSCluster; 28 import org.apache.hadoop.test.GenericTestUtils; 29 import org.apache.htrace.HTraceConfiguration; 30 import org.apache.htrace.Sampler; 31 import org.apache.htrace.Span; 32 import org.apache.htrace.SpanReceiver; 33 import org.apache.htrace.Trace; 34 import org.apache.htrace.TraceScope; 35 import org.junit.After; 36 import org.junit.Assert; 37 import org.junit.Before; 38 import org.junit.BeforeClass; 39 import org.junit.Test; 40 41 import java.io.IOException; 42 import java.nio.ByteBuffer; 43 import java.util.HashMap; 44 import java.util.LinkedList; 45 import java.util.List; 46 import java.util.Map; 47 import java.util.concurrent.ConcurrentHashMap; 48 import java.util.concurrent.TimeoutException; 49 50 import com.google.common.base.Supplier; 51 52 public class TestTracing { 53 54 private static Configuration conf; 55 private static MiniDFSCluster cluster; 56 private static DistributedFileSystem dfs; 57 58 @Test testTracing()59 public void testTracing() throws Exception { 60 // write and read without tracing started 61 String fileName = "testTracingDisabled.dat"; 62 writeTestFile(fileName); 63 Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0); 64 readTestFile(fileName); 65 Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0); 66 67 writeWithTracing(); 68 readWithTracing(); 69 } 70 writeWithTracing()71 public void writeWithTracing() throws Exception { 72 long startTime = System.currentTimeMillis(); 73 TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS); 74 writeTestFile("testWriteTraceHooks.dat"); 75 long endTime = System.currentTimeMillis(); 76 ts.close(); 77 78 String[] expectedSpanNames = { 79 "testWriteTraceHooks", 80 "org.apache.hadoop.hdfs.protocol.ClientProtocol.create", 81 "ClientNamenodeProtocol#create", 82 "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", 83 "ClientNamenodeProtocol#fsync", 84 "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete", 85 "ClientNamenodeProtocol#complete", 86 "newStreamForCreate", 87 "DFSOutputStream#writeChunk", 88 "DFSOutputStream#close", 89 "dataStreamer", 90 "OpWriteBlockProto", 91 "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock", 92 "ClientNamenodeProtocol#addBlock" 93 }; 94 assertSpanNamesFound(expectedSpanNames); 95 96 // The trace should last about the same amount of time as the test 97 Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap(); 98 Span s = map.get("testWriteTraceHooks").get(0); 99 Assert.assertNotNull(s); 100 long spanStart = s.getStartTimeMillis(); 101 long spanEnd = s.getStopTimeMillis(); 102 103 // Spans homed in the top trace shoud have same trace id. 104 // Spans having multiple parents (e.g. "dataStreamer" added by HDFS-7054) 105 // and children of them are exception. 106 String[] spansInTopTrace = { 107 "testWriteTraceHooks", 108 "org.apache.hadoop.hdfs.protocol.ClientProtocol.create", 109 "ClientNamenodeProtocol#create", 110 "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync", 111 "ClientNamenodeProtocol#fsync", 112 "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete", 113 "ClientNamenodeProtocol#complete", 114 "newStreamForCreate", 115 "DFSOutputStream#writeChunk", 116 "DFSOutputStream#close", 117 }; 118 for (String desc : spansInTopTrace) { 119 for (Span span : map.get(desc)) { 120 Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); 121 } 122 } 123 SetSpanReceiver.SetHolder.spans.clear(); 124 } 125 readWithTracing()126 public void readWithTracing() throws Exception { 127 String fileName = "testReadTraceHooks.dat"; 128 writeTestFile(fileName); 129 long startTime = System.currentTimeMillis(); 130 TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS); 131 readTestFile(fileName); 132 ts.close(); 133 long endTime = System.currentTimeMillis(); 134 135 String[] expectedSpanNames = { 136 "testReadTraceHooks", 137 "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations", 138 "ClientNamenodeProtocol#getBlockLocations", 139 "OpReadBlockProto" 140 }; 141 assertSpanNamesFound(expectedSpanNames); 142 143 // The trace should last about the same amount of time as the test 144 Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap(); 145 Span s = map.get("testReadTraceHooks").get(0); 146 Assert.assertNotNull(s); 147 148 long spanStart = s.getStartTimeMillis(); 149 long spanEnd = s.getStopTimeMillis(); 150 Assert.assertTrue(spanStart - startTime < 100); 151 Assert.assertTrue(spanEnd - endTime < 100); 152 153 // There should only be one trace id as it should all be homed in the 154 // top trace. 155 for (Span span : SetSpanReceiver.SetHolder.spans.values()) { 156 Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId()); 157 } 158 SetSpanReceiver.SetHolder.spans.clear(); 159 } 160 writeTestFile(String testFileName)161 private void writeTestFile(String testFileName) throws Exception { 162 Path filePath = new Path(testFileName); 163 FSDataOutputStream stream = dfs.create(filePath); 164 for (int i = 0; i < 10; i++) { 165 byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes(); 166 stream.write(data); 167 } 168 stream.hsync(); 169 stream.close(); 170 } 171 readTestFile(String testFileName)172 private void readTestFile(String testFileName) throws Exception { 173 Path filePath = new Path(testFileName); 174 FSDataInputStream istream = dfs.open(filePath, 10240); 175 ByteBuffer buf = ByteBuffer.allocate(10240); 176 177 int count = 0; 178 try { 179 while (istream.read(buf) > 0) { 180 count += 1; 181 buf.clear(); 182 istream.seek(istream.getPos() + 5); 183 } 184 } catch (IOException ioe) { 185 // Ignore this it's probably a seek after eof. 186 } finally { 187 istream.close(); 188 } 189 } 190 191 @BeforeClass setup()192 public static void setup() throws IOException { 193 conf = new Configuration(); 194 conf.setLong("dfs.blocksize", 100 * 1024); 195 conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX + 196 SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX, 197 SetSpanReceiver.class.getName()); 198 } 199 200 @Before startCluster()201 public void startCluster() throws IOException { 202 cluster = new MiniDFSCluster.Builder(conf) 203 .numDataNodes(3) 204 .build(); 205 cluster.waitActive(); 206 dfs = cluster.getFileSystem(); 207 SetSpanReceiver.SetHolder.spans.clear(); 208 } 209 210 @After shutDown()211 public void shutDown() throws IOException { 212 cluster.shutdown(); 213 } 214 assertSpanNamesFound(final String[] expectedSpanNames)215 static void assertSpanNamesFound(final String[] expectedSpanNames) { 216 try { 217 GenericTestUtils.waitFor(new Supplier<Boolean>() { 218 @Override 219 public Boolean get() { 220 Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap(); 221 for (String spanName : expectedSpanNames) { 222 if (!map.containsKey(spanName)) { 223 return false; 224 } 225 } 226 return true; 227 } 228 }, 100, 1000); 229 } catch (TimeoutException e) { 230 Assert.fail("timed out to get expected spans: " + e.getMessage()); 231 } catch (InterruptedException e) { 232 Assert.fail("interrupted while waiting spans: " + e.getMessage()); 233 } 234 } 235 236 /** 237 * Span receiver that puts all spans into a single set. 238 * This is useful for testing. 239 * <p/> 240 * We're not using HTrace's POJOReceiver here so as that doesn't 241 * push all the metrics to a static place, and would make testing 242 * SpanReceiverHost harder. 243 */ 244 public static class SetSpanReceiver implements SpanReceiver { 245 SetSpanReceiver(HTraceConfiguration conf)246 public SetSpanReceiver(HTraceConfiguration conf) { 247 } 248 receiveSpan(Span span)249 public void receiveSpan(Span span) { 250 SetHolder.spans.put(span.getSpanId(), span); 251 } 252 close()253 public void close() { 254 } 255 256 public static class SetHolder { 257 public static ConcurrentHashMap<Long, Span> spans = 258 new ConcurrentHashMap<Long, Span>(); 259 size()260 public static int size() { 261 return spans.size(); 262 } 263 getMap()264 public static Map<String, List<Span>> getMap() { 265 Map<String, List<Span>> map = new HashMap<String, List<Span>>(); 266 267 for (Span s : spans.values()) { 268 List<Span> l = map.get(s.getDescription()); 269 if (l == null) { 270 l = new LinkedList<Span>(); 271 map.put(s.getDescription(), l); 272 } 273 l.add(s); 274 } 275 return map; 276 } 277 } 278 } 279 } 280