1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.resourcemanager.metrics;
20 
21 import static org.mockito.Mockito.mock;
22 import static org.mockito.Mockito.when;
23 
24 import java.util.EnumSet;
25 
26 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
27 import org.apache.hadoop.yarn.api.records.ApplicationId;
28 import org.apache.hadoop.yarn.api.records.Container;
29 import org.apache.hadoop.yarn.api.records.ContainerId;
30 import org.apache.hadoop.yarn.api.records.ContainerState;
31 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
32 import org.apache.hadoop.yarn.api.records.NodeId;
33 import org.apache.hadoop.yarn.api.records.Priority;
34 import org.apache.hadoop.yarn.api.records.Resource;
35 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
36 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
37 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
38 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
39 import org.apache.hadoop.yarn.conf.YarnConfiguration;
40 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer;
41 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants;
42 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants;
43 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants;
44 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
45 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
46 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
47 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
48 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
49 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
50 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore;
51 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field;
52 import org.apache.hadoop.yarn.server.timeline.TimelineStore;
53 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore;
54 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore;
55 import org.junit.AfterClass;
56 import org.junit.Assert;
57 import org.junit.BeforeClass;
58 import org.junit.Test;
59 
60 public class TestSystemMetricsPublisher {
61 
62   private static ApplicationHistoryServer timelineServer;
63   private static SystemMetricsPublisher metricsPublisher;
64   private static TimelineStore store;
65 
66   @BeforeClass
setup()67   public static void setup() throws Exception {
68     YarnConfiguration conf = new YarnConfiguration();
69     conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
70     conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true);
71     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE,
72         MemoryTimelineStore.class, TimelineStore.class);
73     conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS,
74         MemoryTimelineStateStore.class, TimelineStateStore.class);
75     conf.setInt(
76         YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE,
77         2);
78 
79     timelineServer = new ApplicationHistoryServer();
80     timelineServer.init(conf);
81     timelineServer.start();
82     store = timelineServer.getTimelineStore();
83 
84     metricsPublisher = new SystemMetricsPublisher();
85     metricsPublisher.init(conf);
86     metricsPublisher.start();
87   }
88 
89   @AfterClass
tearDown()90   public static void tearDown() throws Exception {
91     if (metricsPublisher != null) {
92       metricsPublisher.stop();
93     }
94     if (timelineServer != null) {
95       timelineServer.stop();
96     }
97   }
98 
99   @Test(timeout = 10000)
testPublishApplicationMetrics()100   public void testPublishApplicationMetrics() throws Exception {
101     for (int i = 1; i <= 2; ++i) {
102       ApplicationId appId = ApplicationId.newInstance(0, i);
103       RMApp app = createRMApp(appId);
104       metricsPublisher.appCreated(app, app.getStartTime());
105       metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime());
106       if (i == 1) {
107         metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L);
108       } else {
109         // in case user doesn't specify the ACLs
110         metricsPublisher.appACLsUpdated(app, null, 4L);
111       }
112       TimelineEntity entity = null;
113       do {
114         entity =
115             store.getEntity(appId.toString(),
116                 ApplicationMetricsConstants.ENTITY_TYPE,
117                 EnumSet.allOf(Field.class));
118         // ensure three events are both published before leaving the loop
119       } while (entity == null || entity.getEvents().size() < 3);
120       // verify all the fields
121       Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE,
122           entity.getEntityType());
123       Assert
124           .assertEquals(app.getApplicationId().toString(), entity.getEntityId());
125       Assert
126           .assertEquals(
127               app.getName(),
128               entity.getOtherInfo().get(
129                   ApplicationMetricsConstants.NAME_ENTITY_INFO));
130       Assert.assertEquals(app.getQueue(),
131           entity.getOtherInfo()
132               .get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO));
133       Assert
134           .assertEquals(
135               app.getUser(),
136               entity.getOtherInfo().get(
137                   ApplicationMetricsConstants.USER_ENTITY_INFO));
138       Assert
139           .assertEquals(
140               app.getApplicationType(),
141               entity.getOtherInfo().get(
142                   ApplicationMetricsConstants.TYPE_ENTITY_INFO));
143       Assert.assertEquals(app.getSubmitTime(),
144           entity.getOtherInfo().get(
145               ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO));
146       if (i == 1) {
147         Assert.assertEquals("uers1,user2",
148             entity.getOtherInfo().get(
149                 ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
150       } else {
151         Assert.assertEquals(
152             "",
153             entity.getOtherInfo().get(
154                 ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO));
155         Assert.assertEquals(
156             app.getRMAppMetrics().getMemorySeconds(),
157             Long.parseLong(entity.getOtherInfo()
158                 .get(ApplicationMetricsConstants.APP_MEM_METRICS).toString()));
159         Assert.assertEquals(
160             app.getRMAppMetrics().getVcoreSeconds(),
161             Long.parseLong(entity.getOtherInfo()
162                 .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString()));
163       }
164       boolean hasCreatedEvent = false;
165       boolean hasFinishedEvent = false;
166       boolean hasACLsUpdatedEvent = false;
167       for (TimelineEvent event : entity.getEvents()) {
168         if (event.getEventType().equals(
169             ApplicationMetricsConstants.CREATED_EVENT_TYPE)) {
170           hasCreatedEvent = true;
171           Assert.assertEquals(app.getStartTime(), event.getTimestamp());
172         } else if (event.getEventType().equals(
173             ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) {
174           hasFinishedEvent = true;
175           Assert.assertEquals(app.getFinishTime(), event.getTimestamp());
176           Assert.assertEquals(
177               app.getDiagnostics().toString(),
178               event.getEventInfo().get(
179                   ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
180           Assert.assertEquals(
181               app.getFinalApplicationStatus().toString(),
182               event.getEventInfo().get(
183                   ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO));
184           Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event
185               .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO));
186         } else if (event.getEventType().equals(
187             ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) {
188           hasACLsUpdatedEvent = true;
189           Assert.assertEquals(4L, event.getTimestamp());
190         }
191       }
192       Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent);
193     }
194   }
195 
196   @Test(timeout = 10000)
testPublishAppAttemptMetrics()197   public void testPublishAppAttemptMetrics() throws Exception {
198     ApplicationAttemptId appAttemptId =
199         ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1);
200     RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId);
201     metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L);
202     RMApp app = mock(RMApp.class);
203     when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED);
204     metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app,
205         Integer.MAX_VALUE + 2L);
206     TimelineEntity entity = null;
207     do {
208       entity =
209           store.getEntity(appAttemptId.toString(),
210               AppAttemptMetricsConstants.ENTITY_TYPE,
211               EnumSet.allOf(Field.class));
212       // ensure two events are both published before leaving the loop
213     } while (entity == null || entity.getEvents().size() < 2);
214     // verify all the fields
215     Assert.assertEquals(AppAttemptMetricsConstants.ENTITY_TYPE,
216         entity.getEntityType());
217     Assert.assertEquals(appAttemptId.toString(), entity.getEntityId());
218     Assert.assertEquals(
219         appAttemptId.getApplicationId().toString(),
220         entity.getPrimaryFilters()
221             .get(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER).iterator()
222             .next());
223     boolean hasRegisteredEvent = false;
224     boolean hasFinishedEvent = false;
225     for (TimelineEvent event : entity.getEvents()) {
226       if (event.getEventType().equals(
227           AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) {
228         hasRegisteredEvent = true;
229         Assert.assertEquals(appAttempt.getHost(),
230             event.getEventInfo()
231                 .get(AppAttemptMetricsConstants.HOST_EVENT_INFO));
232         Assert
233             .assertEquals(appAttempt.getRpcPort(),
234                 event.getEventInfo().get(
235                     AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO));
236         Assert.assertEquals(
237             appAttempt.getMasterContainer().getId().toString(),
238             event.getEventInfo().get(
239                 AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO));
240       } else if (event.getEventType().equals(
241           AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) {
242         hasFinishedEvent = true;
243         Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo()
244             .get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
245         Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo()
246             .get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO));
247         Assert.assertEquals(
248             appAttempt.getOriginalTrackingUrl(),
249             event.getEventInfo().get(
250                 AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO));
251         Assert.assertEquals(
252             FinalApplicationStatus.UNDEFINED.toString(),
253             event.getEventInfo().get(
254                 AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO));
255         Assert.assertEquals(
256             YarnApplicationAttemptState.FINISHED.toString(),
257             event.getEventInfo().get(
258                 AppAttemptMetricsConstants.STATE_EVENT_INFO));
259       }
260     }
261     Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent);
262   }
263 
264   @Test(timeout = 10000)
testPublishContainerMetrics()265   public void testPublishContainerMetrics() throws Exception {
266     ContainerId containerId =
267         ContainerId.newContainerId(ApplicationAttemptId.newInstance(
268             ApplicationId.newInstance(0, 1), 1), 1);
269     RMContainer container = createRMContainer(containerId);
270     metricsPublisher.containerCreated(container, container.getCreationTime());
271     metricsPublisher.containerFinished(container, container.getFinishTime());
272     TimelineEntity entity = null;
273     do {
274       entity =
275           store.getEntity(containerId.toString(),
276               ContainerMetricsConstants.ENTITY_TYPE,
277               EnumSet.allOf(Field.class));
278       // ensure two events are both published before leaving the loop
279     } while (entity == null || entity.getEvents().size() < 2);
280     // verify all the fields
281     Assert.assertEquals(ContainerMetricsConstants.ENTITY_TYPE,
282         entity.getEntityType());
283     Assert.assertEquals(containerId.toString(), entity.getEntityId());
284     Assert.assertEquals(
285         containerId.getApplicationAttemptId().toString(),
286         entity.getPrimaryFilters()
287             .get(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER).iterator()
288             .next());
289     Assert.assertEquals(
290         container.getAllocatedNode().getHost(),
291         entity.getOtherInfo().get(
292             ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO));
293     Assert.assertEquals(
294         container.getAllocatedNode().getPort(),
295         entity.getOtherInfo().get(
296             ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO));
297     Assert.assertEquals(
298         container.getAllocatedResource().getMemory(),
299         entity.getOtherInfo().get(
300             ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO));
301     Assert.assertEquals(
302         container.getAllocatedResource().getVirtualCores(),
303         entity.getOtherInfo().get(
304             ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO));
305     Assert.assertEquals(
306         container.getAllocatedPriority().getPriority(),
307         entity.getOtherInfo().get(
308             ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO));
309     boolean hasCreatedEvent = false;
310     boolean hasFinishedEvent = false;
311     for (TimelineEvent event : entity.getEvents()) {
312       if (event.getEventType().equals(
313           ContainerMetricsConstants.CREATED_EVENT_TYPE)) {
314         hasCreatedEvent = true;
315         Assert.assertEquals(container.getCreationTime(), event.getTimestamp());
316       } else if (event.getEventType().equals(
317           ContainerMetricsConstants.FINISHED_EVENT_TYPE)) {
318         hasFinishedEvent = true;
319         Assert.assertEquals(container.getFinishTime(), event.getTimestamp());
320         Assert.assertEquals(
321             container.getDiagnosticsInfo(),
322             event.getEventInfo().get(
323                 ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO));
324         Assert.assertEquals(
325             container.getContainerExitStatus(),
326             event.getEventInfo().get(
327                 ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO));
328         Assert.assertEquals(container.getContainerState().toString(), event
329             .getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO));
330       }
331     }
332     Assert.assertTrue(hasCreatedEvent && hasFinishedEvent);
333   }
334 
createRMApp(ApplicationId appId)335   private static RMApp createRMApp(ApplicationId appId) {
336     RMApp app = mock(RMApp.class);
337     when(app.getApplicationId()).thenReturn(appId);
338     when(app.getName()).thenReturn("test app");
339     when(app.getApplicationType()).thenReturn("test app type");
340     when(app.getUser()).thenReturn("test user");
341     when(app.getQueue()).thenReturn("test queue");
342     when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L);
343     when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L);
344     when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L);
345     when(app.getDiagnostics()).thenReturn(
346         new StringBuilder("test diagnostics info"));
347     RMAppAttempt appAttempt = mock(RMAppAttempt.class);
348     when(appAttempt.getAppAttemptId()).thenReturn(
349         ApplicationAttemptId.newInstance(appId, 1));
350     when(app.getCurrentAppAttempt()).thenReturn(appAttempt);
351     when(app.getFinalApplicationStatus()).thenReturn(
352         FinalApplicationStatus.UNDEFINED);
353     when(app.getRMAppMetrics()).thenReturn(
354         new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE));
355     return app;
356   }
357 
createRMAppAttempt( ApplicationAttemptId appAttemptId)358   private static RMAppAttempt createRMAppAttempt(
359       ApplicationAttemptId appAttemptId) {
360     RMAppAttempt appAttempt = mock(RMAppAttempt.class);
361     when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId);
362     when(appAttempt.getHost()).thenReturn("test host");
363     when(appAttempt.getRpcPort()).thenReturn(-100);
364     Container container = mock(Container.class);
365     when(container.getId())
366         .thenReturn(ContainerId.newContainerId(appAttemptId, 1));
367     when(appAttempt.getMasterContainer()).thenReturn(container);
368     when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info");
369     when(appAttempt.getTrackingUrl()).thenReturn("test tracking url");
370     when(appAttempt.getOriginalTrackingUrl()).thenReturn(
371         "test original tracking url");
372     return appAttempt;
373   }
374 
createRMContainer(ContainerId containerId)375   private static RMContainer createRMContainer(ContainerId containerId) {
376     RMContainer container = mock(RMContainer.class);
377     when(container.getContainerId()).thenReturn(containerId);
378     when(container.getAllocatedNode()).thenReturn(
379         NodeId.newInstance("test host", -100));
380     when(container.getAllocatedResource()).thenReturn(
381         Resource.newInstance(-1, -1));
382     when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED);
383     when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L);
384     when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L);
385     when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info");
386     when(container.getContainerExitStatus()).thenReturn(-1);
387     when(container.getContainerState()).thenReturn(ContainerState.COMPLETE);
388     Container mockContainer = mock(Container.class);
389     when(container.getContainer()).thenReturn(mockContainer);
390     when(mockContainer.getNodeHttpAddress())
391       .thenReturn("http://localhost:1234");
392     return container;
393   }
394 
395 }
396