1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.tracing;
19 
20 import org.apache.commons.lang.RandomStringUtils;
21 import org.apache.hadoop.conf.Configuration;
22 import org.apache.hadoop.fs.FSDataInputStream;
23 import org.apache.hadoop.fs.FSDataOutputStream;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.hdfs.DFSConfigKeys;
26 import org.apache.hadoop.hdfs.DistributedFileSystem;
27 import org.apache.hadoop.hdfs.MiniDFSCluster;
28 import org.apache.hadoop.test.GenericTestUtils;
29 import org.apache.htrace.HTraceConfiguration;
30 import org.apache.htrace.Sampler;
31 import org.apache.htrace.Span;
32 import org.apache.htrace.SpanReceiver;
33 import org.apache.htrace.Trace;
34 import org.apache.htrace.TraceScope;
35 import org.junit.After;
36 import org.junit.Assert;
37 import org.junit.Before;
38 import org.junit.BeforeClass;
39 import org.junit.Test;
40 
41 import java.io.IOException;
42 import java.nio.ByteBuffer;
43 import java.util.HashMap;
44 import java.util.LinkedList;
45 import java.util.List;
46 import java.util.Map;
47 import java.util.concurrent.ConcurrentHashMap;
48 import java.util.concurrent.TimeoutException;
49 
50 import com.google.common.base.Supplier;
51 
52 public class TestTracing {
53 
54   private static Configuration conf;
55   private static MiniDFSCluster cluster;
56   private static DistributedFileSystem dfs;
57 
58   @Test
testTracing()59   public void testTracing() throws Exception {
60     // write and read without tracing started
61     String fileName = "testTracingDisabled.dat";
62     writeTestFile(fileName);
63     Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0);
64     readTestFile(fileName);
65     Assert.assertTrue(SetSpanReceiver.SetHolder.size() == 0);
66 
67     writeWithTracing();
68     readWithTracing();
69   }
70 
writeWithTracing()71   public void writeWithTracing() throws Exception {
72     long startTime = System.currentTimeMillis();
73     TraceScope ts = Trace.startSpan("testWriteTraceHooks", Sampler.ALWAYS);
74     writeTestFile("testWriteTraceHooks.dat");
75     long endTime = System.currentTimeMillis();
76     ts.close();
77 
78     String[] expectedSpanNames = {
79       "testWriteTraceHooks",
80       "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
81       "ClientNamenodeProtocol#create",
82       "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
83       "ClientNamenodeProtocol#fsync",
84       "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
85       "ClientNamenodeProtocol#complete",
86       "newStreamForCreate",
87       "DFSOutputStream#writeChunk",
88       "DFSOutputStream#close",
89       "dataStreamer",
90       "OpWriteBlockProto",
91       "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock",
92       "ClientNamenodeProtocol#addBlock"
93     };
94     assertSpanNamesFound(expectedSpanNames);
95 
96     // The trace should last about the same amount of time as the test
97     Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
98     Span s = map.get("testWriteTraceHooks").get(0);
99     Assert.assertNotNull(s);
100     long spanStart = s.getStartTimeMillis();
101     long spanEnd = s.getStopTimeMillis();
102 
103     // Spans homed in the top trace shoud have same trace id.
104     // Spans having multiple parents (e.g. "dataStreamer" added by HDFS-7054)
105     // and children of them are exception.
106     String[] spansInTopTrace = {
107       "testWriteTraceHooks",
108       "org.apache.hadoop.hdfs.protocol.ClientProtocol.create",
109       "ClientNamenodeProtocol#create",
110       "org.apache.hadoop.hdfs.protocol.ClientProtocol.fsync",
111       "ClientNamenodeProtocol#fsync",
112       "org.apache.hadoop.hdfs.protocol.ClientProtocol.complete",
113       "ClientNamenodeProtocol#complete",
114       "newStreamForCreate",
115       "DFSOutputStream#writeChunk",
116       "DFSOutputStream#close",
117     };
118     for (String desc : spansInTopTrace) {
119       for (Span span : map.get(desc)) {
120         Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
121       }
122     }
123     SetSpanReceiver.SetHolder.spans.clear();
124   }
125 
readWithTracing()126   public void readWithTracing() throws Exception {
127     String fileName = "testReadTraceHooks.dat";
128     writeTestFile(fileName);
129     long startTime = System.currentTimeMillis();
130     TraceScope ts = Trace.startSpan("testReadTraceHooks", Sampler.ALWAYS);
131     readTestFile(fileName);
132     ts.close();
133     long endTime = System.currentTimeMillis();
134 
135     String[] expectedSpanNames = {
136       "testReadTraceHooks",
137       "org.apache.hadoop.hdfs.protocol.ClientProtocol.getBlockLocations",
138       "ClientNamenodeProtocol#getBlockLocations",
139       "OpReadBlockProto"
140     };
141     assertSpanNamesFound(expectedSpanNames);
142 
143     // The trace should last about the same amount of time as the test
144     Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
145     Span s = map.get("testReadTraceHooks").get(0);
146     Assert.assertNotNull(s);
147 
148     long spanStart = s.getStartTimeMillis();
149     long spanEnd = s.getStopTimeMillis();
150     Assert.assertTrue(spanStart - startTime < 100);
151     Assert.assertTrue(spanEnd - endTime < 100);
152 
153     // There should only be one trace id as it should all be homed in the
154     // top trace.
155     for (Span span : SetSpanReceiver.SetHolder.spans.values()) {
156       Assert.assertEquals(ts.getSpan().getTraceId(), span.getTraceId());
157     }
158     SetSpanReceiver.SetHolder.spans.clear();
159   }
160 
writeTestFile(String testFileName)161   private void writeTestFile(String testFileName) throws Exception {
162     Path filePath = new Path(testFileName);
163     FSDataOutputStream stream = dfs.create(filePath);
164     for (int i = 0; i < 10; i++) {
165       byte[] data = RandomStringUtils.randomAlphabetic(102400).getBytes();
166       stream.write(data);
167     }
168     stream.hsync();
169     stream.close();
170   }
171 
readTestFile(String testFileName)172   private void readTestFile(String testFileName) throws Exception {
173     Path filePath = new Path(testFileName);
174     FSDataInputStream istream = dfs.open(filePath, 10240);
175     ByteBuffer buf = ByteBuffer.allocate(10240);
176 
177     int count = 0;
178     try {
179       while (istream.read(buf) > 0) {
180         count += 1;
181         buf.clear();
182         istream.seek(istream.getPos() + 5);
183       }
184     } catch (IOException ioe) {
185       // Ignore this it's probably a seek after eof.
186     } finally {
187       istream.close();
188     }
189   }
190 
191   @BeforeClass
setup()192   public static void setup() throws IOException {
193     conf = new Configuration();
194     conf.setLong("dfs.blocksize", 100 * 1024);
195     conf.set(DFSConfigKeys.DFS_CLIENT_HTRACE_PREFIX +
196         SpanReceiverHost.SPAN_RECEIVERS_CONF_SUFFIX,
197         SetSpanReceiver.class.getName());
198   }
199 
200   @Before
startCluster()201   public void startCluster() throws IOException {
202     cluster = new MiniDFSCluster.Builder(conf)
203         .numDataNodes(3)
204         .build();
205     cluster.waitActive();
206     dfs = cluster.getFileSystem();
207     SetSpanReceiver.SetHolder.spans.clear();
208   }
209 
210   @After
shutDown()211   public void shutDown() throws IOException {
212     cluster.shutdown();
213   }
214 
assertSpanNamesFound(final String[] expectedSpanNames)215   static void assertSpanNamesFound(final String[] expectedSpanNames) {
216     try {
217       GenericTestUtils.waitFor(new Supplier<Boolean>() {
218         @Override
219         public Boolean get() {
220           Map<String, List<Span>> map = SetSpanReceiver.SetHolder.getMap();
221           for (String spanName : expectedSpanNames) {
222             if (!map.containsKey(spanName)) {
223               return false;
224             }
225           }
226           return true;
227         }
228       }, 100, 1000);
229     } catch (TimeoutException e) {
230       Assert.fail("timed out to get expected spans: " + e.getMessage());
231     } catch (InterruptedException e) {
232       Assert.fail("interrupted while waiting spans: " + e.getMessage());
233     }
234   }
235 
236   /**
237    * Span receiver that puts all spans into a single set.
238    * This is useful for testing.
239    * <p/>
240    * We're not using HTrace's POJOReceiver here so as that doesn't
241    * push all the metrics to a static place, and would make testing
242    * SpanReceiverHost harder.
243    */
244   public static class SetSpanReceiver implements SpanReceiver {
245 
SetSpanReceiver(HTraceConfiguration conf)246     public SetSpanReceiver(HTraceConfiguration conf) {
247     }
248 
receiveSpan(Span span)249     public void receiveSpan(Span span) {
250       SetHolder.spans.put(span.getSpanId(), span);
251     }
252 
close()253     public void close() {
254     }
255 
256     public static class SetHolder {
257       public static ConcurrentHashMap<Long, Span> spans =
258           new ConcurrentHashMap<Long, Span>();
259 
size()260       public static int size() {
261         return spans.size();
262       }
263 
getMap()264       public static Map<String, List<Span>> getMap() {
265         Map<String, List<Span>> map = new HashMap<String, List<Span>>();
266 
267         for (Span s : spans.values()) {
268           List<Span> l = map.get(s.getDescription());
269           if (l == null) {
270             l = new LinkedList<Span>();
271             map.put(s.getDescription(), l);
272           }
273           l.add(s);
274         }
275         return map;
276       }
277     }
278   }
279 }
280