1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.resourcemanager.metrics; 20 21 import static org.mockito.Mockito.mock; 22 import static org.mockito.Mockito.when; 23 24 import java.util.EnumSet; 25 26 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; 27 import org.apache.hadoop.yarn.api.records.ApplicationId; 28 import org.apache.hadoop.yarn.api.records.Container; 29 import org.apache.hadoop.yarn.api.records.ContainerId; 30 import org.apache.hadoop.yarn.api.records.ContainerState; 31 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; 32 import org.apache.hadoop.yarn.api.records.NodeId; 33 import org.apache.hadoop.yarn.api.records.Priority; 34 import org.apache.hadoop.yarn.api.records.Resource; 35 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState; 36 import org.apache.hadoop.yarn.api.records.YarnApplicationState; 37 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 38 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent; 39 import org.apache.hadoop.yarn.conf.YarnConfiguration; 40 import org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryServer; 41 import org.apache.hadoop.yarn.server.metrics.AppAttemptMetricsConstants; 42 import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; 43 import org.apache.hadoop.yarn.server.metrics.ContainerMetricsConstants; 44 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; 45 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; 46 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; 47 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; 48 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; 49 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; 50 import org.apache.hadoop.yarn.server.timeline.MemoryTimelineStore; 51 import org.apache.hadoop.yarn.server.timeline.TimelineReader.Field; 52 import org.apache.hadoop.yarn.server.timeline.TimelineStore; 53 import org.apache.hadoop.yarn.server.timeline.recovery.MemoryTimelineStateStore; 54 import org.apache.hadoop.yarn.server.timeline.recovery.TimelineStateStore; 55 import org.junit.AfterClass; 56 import org.junit.Assert; 57 import org.junit.BeforeClass; 58 import org.junit.Test; 59 60 public class TestSystemMetricsPublisher { 61 62 private static ApplicationHistoryServer timelineServer; 63 private static SystemMetricsPublisher metricsPublisher; 64 private static TimelineStore store; 65 66 @BeforeClass setup()67 public static void setup() throws Exception { 68 YarnConfiguration conf = new YarnConfiguration(); 69 conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); 70 conf.setBoolean(YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_ENABLED, true); 71 conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STORE, 72 MemoryTimelineStore.class, TimelineStore.class); 73 conf.setClass(YarnConfiguration.TIMELINE_SERVICE_STATE_STORE_CLASS, 74 MemoryTimelineStateStore.class, TimelineStateStore.class); 75 conf.setInt( 76 YarnConfiguration.RM_SYSTEM_METRICS_PUBLISHER_DISPATCHER_POOL_SIZE, 77 2); 78 79 timelineServer = new ApplicationHistoryServer(); 80 timelineServer.init(conf); 81 timelineServer.start(); 82 store = timelineServer.getTimelineStore(); 83 84 metricsPublisher = new SystemMetricsPublisher(); 85 metricsPublisher.init(conf); 86 metricsPublisher.start(); 87 } 88 89 @AfterClass tearDown()90 public static void tearDown() throws Exception { 91 if (metricsPublisher != null) { 92 metricsPublisher.stop(); 93 } 94 if (timelineServer != null) { 95 timelineServer.stop(); 96 } 97 } 98 99 @Test(timeout = 10000) testPublishApplicationMetrics()100 public void testPublishApplicationMetrics() throws Exception { 101 for (int i = 1; i <= 2; ++i) { 102 ApplicationId appId = ApplicationId.newInstance(0, i); 103 RMApp app = createRMApp(appId); 104 metricsPublisher.appCreated(app, app.getStartTime()); 105 metricsPublisher.appFinished(app, RMAppState.FINISHED, app.getFinishTime()); 106 if (i == 1) { 107 metricsPublisher.appACLsUpdated(app, "uers1,user2", 4L); 108 } else { 109 // in case user doesn't specify the ACLs 110 metricsPublisher.appACLsUpdated(app, null, 4L); 111 } 112 TimelineEntity entity = null; 113 do { 114 entity = 115 store.getEntity(appId.toString(), 116 ApplicationMetricsConstants.ENTITY_TYPE, 117 EnumSet.allOf(Field.class)); 118 // ensure three events are both published before leaving the loop 119 } while (entity == null || entity.getEvents().size() < 3); 120 // verify all the fields 121 Assert.assertEquals(ApplicationMetricsConstants.ENTITY_TYPE, 122 entity.getEntityType()); 123 Assert 124 .assertEquals(app.getApplicationId().toString(), entity.getEntityId()); 125 Assert 126 .assertEquals( 127 app.getName(), 128 entity.getOtherInfo().get( 129 ApplicationMetricsConstants.NAME_ENTITY_INFO)); 130 Assert.assertEquals(app.getQueue(), 131 entity.getOtherInfo() 132 .get(ApplicationMetricsConstants.QUEUE_ENTITY_INFO)); 133 Assert 134 .assertEquals( 135 app.getUser(), 136 entity.getOtherInfo().get( 137 ApplicationMetricsConstants.USER_ENTITY_INFO)); 138 Assert 139 .assertEquals( 140 app.getApplicationType(), 141 entity.getOtherInfo().get( 142 ApplicationMetricsConstants.TYPE_ENTITY_INFO)); 143 Assert.assertEquals(app.getSubmitTime(), 144 entity.getOtherInfo().get( 145 ApplicationMetricsConstants.SUBMITTED_TIME_ENTITY_INFO)); 146 if (i == 1) { 147 Assert.assertEquals("uers1,user2", 148 entity.getOtherInfo().get( 149 ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)); 150 } else { 151 Assert.assertEquals( 152 "", 153 entity.getOtherInfo().get( 154 ApplicationMetricsConstants.APP_VIEW_ACLS_ENTITY_INFO)); 155 Assert.assertEquals( 156 app.getRMAppMetrics().getMemorySeconds(), 157 Long.parseLong(entity.getOtherInfo() 158 .get(ApplicationMetricsConstants.APP_MEM_METRICS).toString())); 159 Assert.assertEquals( 160 app.getRMAppMetrics().getVcoreSeconds(), 161 Long.parseLong(entity.getOtherInfo() 162 .get(ApplicationMetricsConstants.APP_CPU_METRICS).toString())); 163 } 164 boolean hasCreatedEvent = false; 165 boolean hasFinishedEvent = false; 166 boolean hasACLsUpdatedEvent = false; 167 for (TimelineEvent event : entity.getEvents()) { 168 if (event.getEventType().equals( 169 ApplicationMetricsConstants.CREATED_EVENT_TYPE)) { 170 hasCreatedEvent = true; 171 Assert.assertEquals(app.getStartTime(), event.getTimestamp()); 172 } else if (event.getEventType().equals( 173 ApplicationMetricsConstants.FINISHED_EVENT_TYPE)) { 174 hasFinishedEvent = true; 175 Assert.assertEquals(app.getFinishTime(), event.getTimestamp()); 176 Assert.assertEquals( 177 app.getDiagnostics().toString(), 178 event.getEventInfo().get( 179 ApplicationMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); 180 Assert.assertEquals( 181 app.getFinalApplicationStatus().toString(), 182 event.getEventInfo().get( 183 ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO)); 184 Assert.assertEquals(YarnApplicationState.FINISHED.toString(), event 185 .getEventInfo().get(ApplicationMetricsConstants.STATE_EVENT_INFO)); 186 } else if (event.getEventType().equals( 187 ApplicationMetricsConstants.ACLS_UPDATED_EVENT_TYPE)) { 188 hasACLsUpdatedEvent = true; 189 Assert.assertEquals(4L, event.getTimestamp()); 190 } 191 } 192 Assert.assertTrue(hasCreatedEvent && hasFinishedEvent && hasACLsUpdatedEvent); 193 } 194 } 195 196 @Test(timeout = 10000) testPublishAppAttemptMetrics()197 public void testPublishAppAttemptMetrics() throws Exception { 198 ApplicationAttemptId appAttemptId = 199 ApplicationAttemptId.newInstance(ApplicationId.newInstance(0, 1), 1); 200 RMAppAttempt appAttempt = createRMAppAttempt(appAttemptId); 201 metricsPublisher.appAttemptRegistered(appAttempt, Integer.MAX_VALUE + 1L); 202 RMApp app = mock(RMApp.class); 203 when(app.getFinalApplicationStatus()).thenReturn(FinalApplicationStatus.UNDEFINED); 204 metricsPublisher.appAttemptFinished(appAttempt, RMAppAttemptState.FINISHED, app, 205 Integer.MAX_VALUE + 2L); 206 TimelineEntity entity = null; 207 do { 208 entity = 209 store.getEntity(appAttemptId.toString(), 210 AppAttemptMetricsConstants.ENTITY_TYPE, 211 EnumSet.allOf(Field.class)); 212 // ensure two events are both published before leaving the loop 213 } while (entity == null || entity.getEvents().size() < 2); 214 // verify all the fields 215 Assert.assertEquals(AppAttemptMetricsConstants.ENTITY_TYPE, 216 entity.getEntityType()); 217 Assert.assertEquals(appAttemptId.toString(), entity.getEntityId()); 218 Assert.assertEquals( 219 appAttemptId.getApplicationId().toString(), 220 entity.getPrimaryFilters() 221 .get(AppAttemptMetricsConstants.PARENT_PRIMARY_FILTER).iterator() 222 .next()); 223 boolean hasRegisteredEvent = false; 224 boolean hasFinishedEvent = false; 225 for (TimelineEvent event : entity.getEvents()) { 226 if (event.getEventType().equals( 227 AppAttemptMetricsConstants.REGISTERED_EVENT_TYPE)) { 228 hasRegisteredEvent = true; 229 Assert.assertEquals(appAttempt.getHost(), 230 event.getEventInfo() 231 .get(AppAttemptMetricsConstants.HOST_EVENT_INFO)); 232 Assert 233 .assertEquals(appAttempt.getRpcPort(), 234 event.getEventInfo().get( 235 AppAttemptMetricsConstants.RPC_PORT_EVENT_INFO)); 236 Assert.assertEquals( 237 appAttempt.getMasterContainer().getId().toString(), 238 event.getEventInfo().get( 239 AppAttemptMetricsConstants.MASTER_CONTAINER_EVENT_INFO)); 240 } else if (event.getEventType().equals( 241 AppAttemptMetricsConstants.FINISHED_EVENT_TYPE)) { 242 hasFinishedEvent = true; 243 Assert.assertEquals(appAttempt.getDiagnostics(), event.getEventInfo() 244 .get(AppAttemptMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); 245 Assert.assertEquals(appAttempt.getTrackingUrl(), event.getEventInfo() 246 .get(AppAttemptMetricsConstants.TRACKING_URL_EVENT_INFO)); 247 Assert.assertEquals( 248 appAttempt.getOriginalTrackingUrl(), 249 event.getEventInfo().get( 250 AppAttemptMetricsConstants.ORIGINAL_TRACKING_URL_EVENT_INFO)); 251 Assert.assertEquals( 252 FinalApplicationStatus.UNDEFINED.toString(), 253 event.getEventInfo().get( 254 AppAttemptMetricsConstants.FINAL_STATUS_EVENT_INFO)); 255 Assert.assertEquals( 256 YarnApplicationAttemptState.FINISHED.toString(), 257 event.getEventInfo().get( 258 AppAttemptMetricsConstants.STATE_EVENT_INFO)); 259 } 260 } 261 Assert.assertTrue(hasRegisteredEvent && hasFinishedEvent); 262 } 263 264 @Test(timeout = 10000) testPublishContainerMetrics()265 public void testPublishContainerMetrics() throws Exception { 266 ContainerId containerId = 267 ContainerId.newContainerId(ApplicationAttemptId.newInstance( 268 ApplicationId.newInstance(0, 1), 1), 1); 269 RMContainer container = createRMContainer(containerId); 270 metricsPublisher.containerCreated(container, container.getCreationTime()); 271 metricsPublisher.containerFinished(container, container.getFinishTime()); 272 TimelineEntity entity = null; 273 do { 274 entity = 275 store.getEntity(containerId.toString(), 276 ContainerMetricsConstants.ENTITY_TYPE, 277 EnumSet.allOf(Field.class)); 278 // ensure two events are both published before leaving the loop 279 } while (entity == null || entity.getEvents().size() < 2); 280 // verify all the fields 281 Assert.assertEquals(ContainerMetricsConstants.ENTITY_TYPE, 282 entity.getEntityType()); 283 Assert.assertEquals(containerId.toString(), entity.getEntityId()); 284 Assert.assertEquals( 285 containerId.getApplicationAttemptId().toString(), 286 entity.getPrimaryFilters() 287 .get(ContainerMetricsConstants.PARENT_PRIMARIY_FILTER).iterator() 288 .next()); 289 Assert.assertEquals( 290 container.getAllocatedNode().getHost(), 291 entity.getOtherInfo().get( 292 ContainerMetricsConstants.ALLOCATED_HOST_ENTITY_INFO)); 293 Assert.assertEquals( 294 container.getAllocatedNode().getPort(), 295 entity.getOtherInfo().get( 296 ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO)); 297 Assert.assertEquals( 298 container.getAllocatedResource().getMemory(), 299 entity.getOtherInfo().get( 300 ContainerMetricsConstants.ALLOCATED_MEMORY_ENTITY_INFO)); 301 Assert.assertEquals( 302 container.getAllocatedResource().getVirtualCores(), 303 entity.getOtherInfo().get( 304 ContainerMetricsConstants.ALLOCATED_VCORE_ENTITY_INFO)); 305 Assert.assertEquals( 306 container.getAllocatedPriority().getPriority(), 307 entity.getOtherInfo().get( 308 ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO)); 309 boolean hasCreatedEvent = false; 310 boolean hasFinishedEvent = false; 311 for (TimelineEvent event : entity.getEvents()) { 312 if (event.getEventType().equals( 313 ContainerMetricsConstants.CREATED_EVENT_TYPE)) { 314 hasCreatedEvent = true; 315 Assert.assertEquals(container.getCreationTime(), event.getTimestamp()); 316 } else if (event.getEventType().equals( 317 ContainerMetricsConstants.FINISHED_EVENT_TYPE)) { 318 hasFinishedEvent = true; 319 Assert.assertEquals(container.getFinishTime(), event.getTimestamp()); 320 Assert.assertEquals( 321 container.getDiagnosticsInfo(), 322 event.getEventInfo().get( 323 ContainerMetricsConstants.DIAGNOSTICS_INFO_EVENT_INFO)); 324 Assert.assertEquals( 325 container.getContainerExitStatus(), 326 event.getEventInfo().get( 327 ContainerMetricsConstants.EXIT_STATUS_EVENT_INFO)); 328 Assert.assertEquals(container.getContainerState().toString(), event 329 .getEventInfo().get(ContainerMetricsConstants.STATE_EVENT_INFO)); 330 } 331 } 332 Assert.assertTrue(hasCreatedEvent && hasFinishedEvent); 333 } 334 createRMApp(ApplicationId appId)335 private static RMApp createRMApp(ApplicationId appId) { 336 RMApp app = mock(RMApp.class); 337 when(app.getApplicationId()).thenReturn(appId); 338 when(app.getName()).thenReturn("test app"); 339 when(app.getApplicationType()).thenReturn("test app type"); 340 when(app.getUser()).thenReturn("test user"); 341 when(app.getQueue()).thenReturn("test queue"); 342 when(app.getSubmitTime()).thenReturn(Integer.MAX_VALUE + 1L); 343 when(app.getStartTime()).thenReturn(Integer.MAX_VALUE + 2L); 344 when(app.getFinishTime()).thenReturn(Integer.MAX_VALUE + 3L); 345 when(app.getDiagnostics()).thenReturn( 346 new StringBuilder("test diagnostics info")); 347 RMAppAttempt appAttempt = mock(RMAppAttempt.class); 348 when(appAttempt.getAppAttemptId()).thenReturn( 349 ApplicationAttemptId.newInstance(appId, 1)); 350 when(app.getCurrentAppAttempt()).thenReturn(appAttempt); 351 when(app.getFinalApplicationStatus()).thenReturn( 352 FinalApplicationStatus.UNDEFINED); 353 when(app.getRMAppMetrics()).thenReturn( 354 new RMAppMetrics(null, 0, 0, Integer.MAX_VALUE, Long.MAX_VALUE)); 355 return app; 356 } 357 createRMAppAttempt( ApplicationAttemptId appAttemptId)358 private static RMAppAttempt createRMAppAttempt( 359 ApplicationAttemptId appAttemptId) { 360 RMAppAttempt appAttempt = mock(RMAppAttempt.class); 361 when(appAttempt.getAppAttemptId()).thenReturn(appAttemptId); 362 when(appAttempt.getHost()).thenReturn("test host"); 363 when(appAttempt.getRpcPort()).thenReturn(-100); 364 Container container = mock(Container.class); 365 when(container.getId()) 366 .thenReturn(ContainerId.newContainerId(appAttemptId, 1)); 367 when(appAttempt.getMasterContainer()).thenReturn(container); 368 when(appAttempt.getDiagnostics()).thenReturn("test diagnostics info"); 369 when(appAttempt.getTrackingUrl()).thenReturn("test tracking url"); 370 when(appAttempt.getOriginalTrackingUrl()).thenReturn( 371 "test original tracking url"); 372 return appAttempt; 373 } 374 createRMContainer(ContainerId containerId)375 private static RMContainer createRMContainer(ContainerId containerId) { 376 RMContainer container = mock(RMContainer.class); 377 when(container.getContainerId()).thenReturn(containerId); 378 when(container.getAllocatedNode()).thenReturn( 379 NodeId.newInstance("test host", -100)); 380 when(container.getAllocatedResource()).thenReturn( 381 Resource.newInstance(-1, -1)); 382 when(container.getAllocatedPriority()).thenReturn(Priority.UNDEFINED); 383 when(container.getCreationTime()).thenReturn(Integer.MAX_VALUE + 1L); 384 when(container.getFinishTime()).thenReturn(Integer.MAX_VALUE + 2L); 385 when(container.getDiagnosticsInfo()).thenReturn("test diagnostics info"); 386 when(container.getContainerExitStatus()).thenReturn(-1); 387 when(container.getContainerState()).thenReturn(ContainerState.COMPLETE); 388 Container mockContainer = mock(Container.class); 389 when(container.getContainer()).thenReturn(mockContainer); 390 when(mockContainer.getNodeHttpAddress()) 391 .thenReturn("http://localhost:1234"); 392 return container; 393 } 394 395 } 396