1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.server.nodemanager;
20 
21 import static org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils.newNodeHeartbeatResponse;
22 import static org.mockito.Mockito.mock;
23 import static org.mockito.Mockito.when;
24 
25 import java.io.EOFException;
26 import java.io.File;
27 import java.io.IOException;
28 import java.net.InetAddress;
29 import java.net.InetSocketAddress;
30 import java.net.UnknownHostException;
31 import java.nio.ByteBuffer;
32 import java.util.ArrayList;
33 import java.util.Collections;
34 import java.util.HashMap;
35 import java.util.HashSet;
36 import java.util.LinkedList;
37 import java.util.List;
38 import java.util.Map;
39 import java.util.Set;
40 import java.util.concurrent.ConcurrentMap;
41 import java.util.concurrent.CountDownLatch;
42 import java.util.concurrent.CyclicBarrier;
43 import java.util.concurrent.ExecutorService;
44 import java.util.concurrent.Executors;
45 import java.util.concurrent.TimeUnit;
46 import java.util.concurrent.atomic.AtomicBoolean;
47 import java.util.concurrent.atomic.AtomicInteger;
48 
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51 import org.apache.hadoop.conf.Configuration;
52 import org.apache.hadoop.fs.FileContext;
53 import org.apache.hadoop.fs.Path;
54 import org.apache.hadoop.io.DataOutputBuffer;
55 import org.apache.hadoop.io.Text;
56 import org.apache.hadoop.io.retry.RetryPolicy;
57 import org.apache.hadoop.io.retry.RetryProxy;
58 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
59 import org.apache.hadoop.net.NetUtils;
60 import org.apache.hadoop.security.Credentials;
61 import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
62 import org.apache.hadoop.service.Service.STATE;
63 import org.apache.hadoop.service.ServiceOperations;
64 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
65 import org.apache.hadoop.yarn.api.records.ApplicationId;
66 import org.apache.hadoop.yarn.api.records.ContainerId;
67 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
68 import org.apache.hadoop.yarn.api.records.ContainerState;
69 import org.apache.hadoop.yarn.api.records.ContainerStatus;
70 import org.apache.hadoop.yarn.api.records.NodeId;
71 import org.apache.hadoop.yarn.api.records.Resource;
72 import org.apache.hadoop.yarn.api.records.Token;
73 import org.apache.hadoop.yarn.client.RMProxy;
74 import org.apache.hadoop.yarn.conf.YarnConfiguration;
75 import org.apache.hadoop.yarn.event.Dispatcher;
76 import org.apache.hadoop.yarn.event.EventHandler;
77 import org.apache.hadoop.yarn.exceptions.YarnException;
78 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
79 import org.apache.hadoop.yarn.factories.RecordFactory;
80 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
81 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
82 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
83 import org.apache.hadoop.yarn.server.api.ResourceTracker;
84 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
85 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
86 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
87 import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
88 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
89 import org.apache.hadoop.yarn.server.api.records.MasterKey;
90 import org.apache.hadoop.yarn.server.api.records.NodeAction;
91 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
92 import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
93 import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
94 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
95 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
96 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
97 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
98 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
99 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
100 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
101 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
102 import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
103 import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
104 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
105 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
106 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
107 import org.junit.After;
108 import org.junit.Assert;
109 import org.junit.Before;
110 import org.junit.Test;
111 
112 @SuppressWarnings("rawtypes")
113 public class TestNodeStatusUpdater {
114 
115   // temp fix until metrics system can auto-detect itself running in unit test:
116   static {
117     DefaultMetricsSystem.setMiniClusterMode(true);
118   }
119 
120   static final Log LOG = LogFactory.getLog(TestNodeStatusUpdater.class);
121   static final File basedir =
122       new File("target", TestNodeStatusUpdater.class.getName());
123   static final File nmLocalDir = new File(basedir, "nm0");
124   static final File tmpDir = new File(basedir, "tmpDir");
125   static final File remoteLogsDir = new File(basedir, "remotelogs");
126   static final File logsDir = new File(basedir, "logs");
127   private static final RecordFactory recordFactory = RecordFactoryProvider
128       .getRecordFactory(null);
129 
130   volatile int heartBeatID = 0;
131   volatile Throwable nmStartError = null;
132   private final List<NodeId> registeredNodes = new ArrayList<NodeId>();
133   private boolean triggered = false;
134   private Configuration conf;
135   private NodeManager nm;
136   private AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
137 
138   @Before
setUp()139   public void setUp() {
140     nmLocalDir.mkdirs();
141     tmpDir.mkdirs();
142     logsDir.mkdirs();
143     remoteLogsDir.mkdirs();
144     conf = createNMConfig();
145   }
146 
147   @After
tearDown()148   public void tearDown() {
149     this.registeredNodes.clear();
150     heartBeatID = 0;
151     ServiceOperations.stop(nm);
152     assertionFailedInThread.set(false);
153     DefaultMetricsSystem.shutdown();
154   }
155 
createMasterKey()156   public static MasterKey createMasterKey() {
157     MasterKey masterKey = new MasterKeyPBImpl();
158     masterKey.setKeyId(123);
159     masterKey.setBytes(ByteBuffer.wrap(new byte[] { new Integer(123)
160       .byteValue() }));
161     return masterKey;
162   }
163 
164   private class MyResourceTracker implements ResourceTracker {
165 
166     private final Context context;
167 
MyResourceTracker(Context context)168     public MyResourceTracker(Context context) {
169       this.context = context;
170     }
171 
172     @Override
registerNodeManager( RegisterNodeManagerRequest request)173     public RegisterNodeManagerResponse registerNodeManager(
174         RegisterNodeManagerRequest request) throws YarnException,
175         IOException {
176       NodeId nodeId = request.getNodeId();
177       Resource resource = request.getResource();
178       LOG.info("Registering " + nodeId.toString());
179       // NOTE: this really should be checking against the config value
180       InetSocketAddress expected = NetUtils.getConnectAddress(
181           conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
182       Assert.assertEquals(NetUtils.getHostPortString(expected), nodeId.toString());
183       Assert.assertEquals(5 * 1024, resource.getMemory());
184       registeredNodes.add(nodeId);
185 
186       RegisterNodeManagerResponse response = recordFactory
187           .newRecordInstance(RegisterNodeManagerResponse.class);
188       response.setContainerTokenMasterKey(createMasterKey());
189       response.setNMTokenMasterKey(createMasterKey());
190       return response;
191     }
192 
getAppToContainerStatusMap( List<ContainerStatus> containers)193     private Map<ApplicationId, List<ContainerStatus>> getAppToContainerStatusMap(
194         List<ContainerStatus> containers) {
195       Map<ApplicationId, List<ContainerStatus>> map =
196           new HashMap<ApplicationId, List<ContainerStatus>>();
197       for (ContainerStatus cs : containers) {
198         ApplicationId applicationId =
199             cs.getContainerId().getApplicationAttemptId().getApplicationId();
200         List<ContainerStatus> appContainers = map.get(applicationId);
201         if (appContainers == null) {
202           appContainers = new ArrayList<ContainerStatus>();
203           map.put(applicationId, appContainers);
204         }
205         appContainers.add(cs);
206       }
207       return map;
208     }
209 
210     @Override
nodeHeartbeat(NodeHeartbeatRequest request)211     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
212         throws YarnException, IOException {
213       NodeStatus nodeStatus = request.getNodeStatus();
214       LOG.info("Got heartbeat number " + heartBeatID);
215       NodeManagerMetrics mockMetrics = mock(NodeManagerMetrics.class);
216       Dispatcher mockDispatcher = mock(Dispatcher.class);
217       EventHandler mockEventHandler = mock(EventHandler.class);
218       when(mockDispatcher.getEventHandler()).thenReturn(mockEventHandler);
219       NMStateStoreService stateStore = new NMNullStateStoreService();
220       nodeStatus.setResponseId(heartBeatID++);
221       Map<ApplicationId, List<ContainerStatus>> appToContainers =
222           getAppToContainerStatusMap(nodeStatus.getContainersStatuses());
223 
224       ApplicationId appId1 = ApplicationId.newInstance(0, 1);
225       ApplicationId appId2 = ApplicationId.newInstance(0, 2);
226 
227       if (heartBeatID == 1) {
228         Assert.assertEquals(0, nodeStatus.getContainersStatuses().size());
229 
230         // Give a container to the NM.
231         ApplicationAttemptId appAttemptID =
232             ApplicationAttemptId.newInstance(appId1, 0);
233         ContainerId firstContainerID =
234             ContainerId.newContainerId(appAttemptID, heartBeatID);
235         ContainerLaunchContext launchContext = recordFactory
236             .newRecordInstance(ContainerLaunchContext.class);
237         Resource resource = BuilderUtils.newResource(2, 1);
238         long currentTime = System.currentTimeMillis();
239         String user = "testUser";
240         ContainerTokenIdentifier containerToken = BuilderUtils
241             .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
242                 firstContainerID, InetAddress.getByName("localhost")
243                     .getCanonicalHostName(), 1234, user, resource,
244                 currentTime + 10000, 123, "password".getBytes(), currentTime));
245         Container container = new ContainerImpl(conf, mockDispatcher,
246             stateStore, launchContext, null, mockMetrics, containerToken);
247         this.context.getContainers().put(firstContainerID, container);
248       } else if (heartBeatID == 2) {
249         // Checks on the RM end
250         Assert.assertEquals("Number of applications should only be one!", 1,
251             nodeStatus.getContainersStatuses().size());
252         Assert.assertEquals("Number of container for the app should be one!",
253             1, appToContainers.get(appId1).size());
254 
255         // Checks on the NM end
256         ConcurrentMap<ContainerId, Container> activeContainers =
257             this.context.getContainers();
258         Assert.assertEquals(1, activeContainers.size());
259 
260         // Give another container to the NM.
261         ApplicationAttemptId appAttemptID =
262             ApplicationAttemptId.newInstance(appId2, 0);
263         ContainerId secondContainerID =
264             ContainerId.newContainerId(appAttemptID, heartBeatID);
265         ContainerLaunchContext launchContext = recordFactory
266             .newRecordInstance(ContainerLaunchContext.class);
267         long currentTime = System.currentTimeMillis();
268         String user = "testUser";
269         Resource resource = BuilderUtils.newResource(3, 1);
270         ContainerTokenIdentifier containerToken = BuilderUtils
271             .newContainerTokenIdentifier(BuilderUtils.newContainerToken(
272                 secondContainerID, InetAddress.getByName("localhost")
273                     .getCanonicalHostName(), 1234, user, resource,
274                 currentTime + 10000, 123, "password".getBytes(), currentTime));
275         Container container = new ContainerImpl(conf, mockDispatcher,
276             stateStore, launchContext, null, mockMetrics, containerToken);
277         this.context.getContainers().put(secondContainerID, container);
278       } else if (heartBeatID == 3) {
279         // Checks on the RM end
280         Assert.assertEquals("Number of applications should have two!", 2,
281             appToContainers.size());
282         Assert.assertEquals("Number of container for the app-1 should be only one!",
283             1, appToContainers.get(appId1).size());
284         Assert.assertEquals("Number of container for the app-2 should be only one!",
285             1, appToContainers.get(appId2).size());
286 
287         // Checks on the NM end
288         ConcurrentMap<ContainerId, Container> activeContainers =
289             this.context.getContainers();
290         Assert.assertEquals(2, activeContainers.size());
291       }
292 
293       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
294           newNodeHeartbeatResponse(heartBeatID, null, null, null, null, null,
295             1000L);
296       return nhResponse;
297     }
298   }
299 
300   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
301     public ResourceTracker resourceTracker;
302     private Context context;
303 
MyNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)304     public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
305         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
306       super(context, dispatcher, healthChecker, metrics);
307       this.context = context;
308       resourceTracker = new MyResourceTracker(this.context);
309     }
310 
311     @Override
getRMClient()312     protected ResourceTracker getRMClient() {
313       return resourceTracker;
314     }
315 
316     @Override
stopRMProxy()317     protected void stopRMProxy() {
318       return;
319     }
320   }
321 
322   // Test NodeStatusUpdater sends the right container statuses each time it
323   // heart beats.
324   private class MyNodeStatusUpdater2 extends NodeStatusUpdaterImpl {
325     public ResourceTracker resourceTracker;
326 
MyNodeStatusUpdater2(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)327     public MyNodeStatusUpdater2(Context context, Dispatcher dispatcher,
328         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
329       super(context, dispatcher, healthChecker, metrics);
330       resourceTracker = new MyResourceTracker4(context);
331     }
332 
333     @Override
getRMClient()334     protected ResourceTracker getRMClient() {
335       return resourceTracker;
336     }
337 
338     @Override
stopRMProxy()339     protected void stopRMProxy() {
340       return;
341     }
342   }
343 
344   private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
345     public ResourceTracker resourceTracker;
346     private Context context;
347 
MyNodeStatusUpdater3(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics)348     public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
349         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
350       super(context, dispatcher, healthChecker, metrics);
351       this.context = context;
352       this.resourceTracker = new MyResourceTracker3(this.context);
353     }
354 
355     @Override
getRMClient()356     protected ResourceTracker getRMClient() {
357       return resourceTracker;
358     }
359 
360     @Override
stopRMProxy()361     protected void stopRMProxy() {
362       return;
363     }
364 
365     @Override
isTokenKeepAliveEnabled(Configuration conf)366     protected boolean isTokenKeepAliveEnabled(Configuration conf) {
367       return true;
368     }
369   }
370 
371   private class MyNodeStatusUpdater4 extends NodeStatusUpdaterImpl {
372 
373     private final long rmStartIntervalMS;
374     private final boolean rmNeverStart;
375     public ResourceTracker resourceTracker;
MyNodeStatusUpdater4(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, long rmStartIntervalMS, boolean rmNeverStart)376     public MyNodeStatusUpdater4(Context context, Dispatcher dispatcher,
377         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
378         long rmStartIntervalMS, boolean rmNeverStart) {
379       super(context, dispatcher, healthChecker, metrics);
380       this.rmStartIntervalMS = rmStartIntervalMS;
381       this.rmNeverStart = rmNeverStart;
382     }
383 
384     @Override
serviceStart()385     protected void serviceStart() throws Exception {
386       //record the startup time
387       super.serviceStart();
388     }
389 
390     @Override
getRMClient()391     protected ResourceTracker getRMClient() throws IOException {
392       RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
393       resourceTracker =
394           (ResourceTracker) RetryProxy.create(ResourceTracker.class,
395             new MyResourceTracker6(rmStartIntervalMS, rmNeverStart),
396             retryPolicy);
397       return resourceTracker;
398     }
399 
isTriggered()400     private boolean isTriggered() {
401       return triggered;
402     }
403 
404     @Override
stopRMProxy()405     protected void stopRMProxy() {
406       return;
407     }
408   }
409 
410 
411 
412   private class MyNodeStatusUpdater5 extends NodeStatusUpdaterImpl {
413     private ResourceTracker resourceTracker;
414     private Configuration conf;
415 
MyNodeStatusUpdater5(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf)416     public MyNodeStatusUpdater5(Context context, Dispatcher dispatcher,
417         NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics, Configuration conf) {
418       super(context, dispatcher, healthChecker, metrics);
419       resourceTracker = new MyResourceTracker5();
420       this.conf = conf;
421     }
422 
423     @Override
getRMClient()424     protected ResourceTracker getRMClient() {
425       RetryPolicy retryPolicy = RMProxy.createRetryPolicy(conf);
426       return (ResourceTracker) RetryProxy.create(ResourceTracker.class,
427         resourceTracker, retryPolicy);
428     }
429 
430     @Override
stopRMProxy()431     protected void stopRMProxy() {
432       return;
433     }
434   }
435 
436   private class MyNodeManager extends NodeManager {
437 
438     private MyNodeStatusUpdater3 nodeStatusUpdater;
439     @Override
createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)440     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
441         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
442       this.nodeStatusUpdater =
443           new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics);
444       return this.nodeStatusUpdater;
445     }
446 
getNodeStatusUpdater()447     public MyNodeStatusUpdater3 getNodeStatusUpdater() {
448       return this.nodeStatusUpdater;
449     }
450   }
451 
452   private class MyNodeManager2 extends NodeManager {
453     public boolean isStopped = false;
454     private NodeStatusUpdater nodeStatusUpdater;
455     private CyclicBarrier syncBarrier;
456     private Configuration conf;
457 
MyNodeManager2(CyclicBarrier syncBarrier, Configuration conf)458     public MyNodeManager2 (CyclicBarrier syncBarrier, Configuration conf) {
459       this.syncBarrier = syncBarrier;
460       this.conf = conf;
461     }
462     @Override
createNodeStatusUpdater(Context context, Dispatcher dispatcher, NodeHealthCheckerService healthChecker)463     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
464         Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
465       nodeStatusUpdater =
466           new MyNodeStatusUpdater5(context, dispatcher, healthChecker,
467                                      metrics, conf);
468       return nodeStatusUpdater;
469     }
470 
471     @Override
serviceStop()472     protected void serviceStop() throws Exception {
473       System.out.println("Called stooppppp");
474       super.serviceStop();
475       isStopped = true;
476       ConcurrentMap<ApplicationId, Application> applications =
477           getNMContext().getApplications();
478       // ensure that applications are empty
479       if(!applications.isEmpty()) {
480         assertionFailedInThread.set(true);
481       }
482       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
483     }
484   }
485   //
486   private class MyResourceTracker2 implements ResourceTracker {
487     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
488     public NodeAction registerNodeAction = NodeAction.NORMAL;
489     public String shutDownMessage = "";
490     public String rmVersion = "3.0.1";
491 
492     @Override
registerNodeManager( RegisterNodeManagerRequest request)493     public RegisterNodeManagerResponse registerNodeManager(
494         RegisterNodeManagerRequest request) throws YarnException,
495         IOException {
496 
497       RegisterNodeManagerResponse response = recordFactory
498           .newRecordInstance(RegisterNodeManagerResponse.class);
499       response.setNodeAction(registerNodeAction );
500       response.setContainerTokenMasterKey(createMasterKey());
501       response.setNMTokenMasterKey(createMasterKey());
502       response.setDiagnosticsMessage(shutDownMessage);
503       response.setRMVersion(rmVersion);
504       return response;
505     }
506     @Override
nodeHeartbeat(NodeHeartbeatRequest request)507     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
508         throws YarnException, IOException {
509       NodeStatus nodeStatus = request.getNodeStatus();
510       nodeStatus.setResponseId(heartBeatID++);
511 
512       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
513           newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
514               null, null, null, 1000L);
515       nhResponse.setDiagnosticsMessage(shutDownMessage);
516       return nhResponse;
517     }
518   }
519 
520   private class MyResourceTracker3 implements ResourceTracker {
521     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
522     public NodeAction registerNodeAction = NodeAction.NORMAL;
523     private Map<ApplicationId, List<Long>> keepAliveRequests =
524         new HashMap<ApplicationId, List<Long>>();
525     private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
526     private final Context context;
527 
MyResourceTracker3(Context context)528     MyResourceTracker3(Context context) {
529       this.context = context;
530     }
531 
532     @Override
registerNodeManager( RegisterNodeManagerRequest request)533     public RegisterNodeManagerResponse registerNodeManager(
534         RegisterNodeManagerRequest request) throws YarnException,
535         IOException {
536 
537       RegisterNodeManagerResponse response =
538           recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
539       response.setNodeAction(registerNodeAction);
540       response.setContainerTokenMasterKey(createMasterKey());
541       response.setNMTokenMasterKey(createMasterKey());
542       return response;
543     }
544 
545     @Override
nodeHeartbeat(NodeHeartbeatRequest request)546     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
547         throws YarnException, IOException {
548       LOG.info("Got heartBeatId: [" + heartBeatID +"]");
549       NodeStatus nodeStatus = request.getNodeStatus();
550       nodeStatus.setResponseId(heartBeatID++);
551       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
552           newNodeHeartbeatResponse(heartBeatID, heartBeatNodeAction, null,
553               null, null, null, 1000L);
554 
555       if (nodeStatus.getKeepAliveApplications() != null
556           && nodeStatus.getKeepAliveApplications().size() > 0) {
557         for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
558           List<Long> list = keepAliveRequests.get(appId);
559           if (list == null) {
560             list = new LinkedList<Long>();
561             keepAliveRequests.put(appId, list);
562           }
563           list.add(System.currentTimeMillis());
564         }
565       }
566       if (heartBeatID == 2) {
567         LOG.info("Sending FINISH_APP for application: [" + appId + "]");
568         this.context.getApplications().put(appId, mock(Application.class));
569         nhResponse.addAllApplicationsToCleanup(Collections.singletonList(appId));
570       }
571       return nhResponse;
572     }
573   }
574 
575   // Test NodeStatusUpdater sends the right container statuses each time it
576   // heart beats.
577   private Credentials expectedCredentials = new Credentials();
578   private class MyResourceTracker4 implements ResourceTracker {
579 
580     public NodeAction registerNodeAction = NodeAction.NORMAL;
581     public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
582     private Context context;
583     private final ContainerStatus containerStatus2 =
584         createContainerStatus(2, ContainerState.RUNNING);
585     private final ContainerStatus containerStatus3 =
586         createContainerStatus(3, ContainerState.COMPLETE);
587     private final ContainerStatus containerStatus4 =
588         createContainerStatus(4, ContainerState.RUNNING);
589     private final ContainerStatus containerStatus5 =
590         createContainerStatus(5, ContainerState.COMPLETE);
591 
MyResourceTracker4(Context context)592     public MyResourceTracker4(Context context) {
593       // create app Credentials
594       org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 =
595           new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>();
596       token1.setKind(new Text("kind1"));
597       expectedCredentials.addToken(new Text("token1"), token1);
598       this.context = context;
599     }
600 
601     @Override
registerNodeManager( RegisterNodeManagerRequest request)602     public RegisterNodeManagerResponse registerNodeManager(
603         RegisterNodeManagerRequest request) throws YarnException, IOException {
604       RegisterNodeManagerResponse response =
605           recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
606       response.setNodeAction(registerNodeAction);
607       response.setContainerTokenMasterKey(createMasterKey());
608       response.setNMTokenMasterKey(createMasterKey());
609       return response;
610     }
611 
612     @Override
nodeHeartbeat(NodeHeartbeatRequest request)613     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
614         throws YarnException, IOException {
615       List<ContainerId> finishedContainersPulledByAM = new ArrayList
616           <ContainerId>();
617       try {
618         if (heartBeatID == 0) {
619           Assert.assertEquals(0, request.getNodeStatus().getContainersStatuses()
620             .size());
621           Assert.assertEquals(0, context.getContainers().size());
622         } else if (heartBeatID == 1) {
623           List<ContainerStatus> statuses =
624               request.getNodeStatus().getContainersStatuses();
625           Assert.assertEquals(2, statuses.size());
626           Assert.assertEquals(2, context.getContainers().size());
627 
628           boolean container2Exist = false, container3Exist = false;
629           for (ContainerStatus status : statuses) {
630             if (status.getContainerId().equals(
631               containerStatus2.getContainerId())) {
632               Assert.assertTrue(status.getState().equals(
633                 containerStatus2.getState()));
634               container2Exist = true;
635             }
636             if (status.getContainerId().equals(
637               containerStatus3.getContainerId())) {
638               Assert.assertTrue(status.getState().equals(
639                 containerStatus3.getState()));
640               container3Exist = true;
641             }
642           }
643           Assert.assertTrue(container2Exist && container3Exist);
644 
645           // should throw exception that can be retried by the
646           // nodeStatusUpdaterRunnable, otherwise nm just shuts down and the
647           // test passes.
648           throw new YarnRuntimeException("Lost the heartbeat response");
649         } else if (heartBeatID == 2 || heartBeatID == 3) {
650           List<ContainerStatus> statuses =
651               request.getNodeStatus().getContainersStatuses();
652           if (heartBeatID == 2) {
653             // NM should send completed containers again, since the last
654             // heartbeat is lost.
655             Assert.assertEquals(4, statuses.size());
656           } else {
657             // NM should not send completed containers again, since the last
658             // heartbeat is successful.
659             Assert.assertEquals(2, statuses.size());
660           }
661           Assert.assertEquals(4, context.getContainers().size());
662 
663           boolean container2Exist = false, container3Exist = false,
664               container4Exist = false, container5Exist = false;
665           for (ContainerStatus status : statuses) {
666             if (status.getContainerId().equals(
667               containerStatus2.getContainerId())) {
668               Assert.assertTrue(status.getState().equals(
669                 containerStatus2.getState()));
670               container2Exist = true;
671             }
672             if (status.getContainerId().equals(
673               containerStatus3.getContainerId())) {
674               Assert.assertTrue(status.getState().equals(
675                 containerStatus3.getState()));
676               container3Exist = true;
677             }
678             if (status.getContainerId().equals(
679               containerStatus4.getContainerId())) {
680               Assert.assertTrue(status.getState().equals(
681                 containerStatus4.getState()));
682               container4Exist = true;
683             }
684             if (status.getContainerId().equals(
685               containerStatus5.getContainerId())) {
686               Assert.assertTrue(status.getState().equals(
687                 containerStatus5.getState()));
688               container5Exist = true;
689             }
690           }
691           if (heartBeatID == 2) {
692             Assert.assertTrue(container2Exist && container3Exist
693                 && container4Exist && container5Exist);
694           } else {
695             // NM do not send completed containers again
696             Assert.assertTrue(container2Exist && !container3Exist
697                 && container4Exist && !container5Exist);
698           }
699 
700           if (heartBeatID == 3) {
701             finishedContainersPulledByAM.add(containerStatus3.getContainerId());
702           }
703         } else if (heartBeatID == 4) {
704           List<ContainerStatus> statuses =
705               request.getNodeStatus().getContainersStatuses();
706           Assert.assertEquals(2, statuses.size());
707           // Container 3 is acked by AM, hence removed from context
708           Assert.assertEquals(3, context.getContainers().size());
709 
710           boolean container3Exist = false;
711           for (ContainerStatus status : statuses) {
712             if (status.getContainerId().equals(
713                 containerStatus3.getContainerId())) {
714               container3Exist = true;
715             }
716           }
717           Assert.assertFalse(container3Exist);
718         }
719       } catch (AssertionError error) {
720         error.printStackTrace();
721         assertionFailedInThread.set(true);
722       } finally {
723         heartBeatID++;
724       }
725       NodeStatus nodeStatus = request.getNodeStatus();
726       nodeStatus.setResponseId(heartBeatID);
727       NodeHeartbeatResponse nhResponse =
728           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
729             heartBeatNodeAction, null, null, null, null, 1000L);
730       nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
731       Map<ApplicationId, ByteBuffer> appCredentials =
732           new HashMap<ApplicationId, ByteBuffer>();
733       DataOutputBuffer dob = new DataOutputBuffer();
734       expectedCredentials.writeTokenStorageToStream(dob);
735       ByteBuffer byteBuffer1 =
736           ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
737       appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
738       nhResponse.setSystemCredentialsForApps(appCredentials);
739       return nhResponse;
740     }
741   }
742 
743   private class MyResourceTracker5 implements ResourceTracker {
744     public NodeAction registerNodeAction = NodeAction.NORMAL;
745     @Override
registerNodeManager( RegisterNodeManagerRequest request)746     public RegisterNodeManagerResponse registerNodeManager(
747         RegisterNodeManagerRequest request) throws YarnException,
748         IOException {
749 
750       RegisterNodeManagerResponse response = recordFactory
751           .newRecordInstance(RegisterNodeManagerResponse.class);
752       response.setNodeAction(registerNodeAction );
753       response.setContainerTokenMasterKey(createMasterKey());
754       response.setNMTokenMasterKey(createMasterKey());
755       return response;
756     }
757 
758     @Override
nodeHeartbeat(NodeHeartbeatRequest request)759     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
760         throws YarnException, IOException {
761       heartBeatID++;
762       if(heartBeatID == 1) {
763         // EOFException should be retried as well.
764         throw new EOFException("NodeHeartbeat exception");
765       }
766       else {
767       throw new java.net.ConnectException(
768           "NodeHeartbeat exception");
769       }
770     }
771   }
772 
773   private class MyResourceTracker6 implements ResourceTracker {
774 
775     private long rmStartIntervalMS;
776     private boolean rmNeverStart;
777     private final long waitStartTime;
778 
MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart)779     public MyResourceTracker6(long rmStartIntervalMS, boolean rmNeverStart) {
780       this.rmStartIntervalMS = rmStartIntervalMS;
781       this.rmNeverStart = rmNeverStart;
782       this.waitStartTime = System.currentTimeMillis();
783     }
784 
785     @Override
registerNodeManager( RegisterNodeManagerRequest request)786     public RegisterNodeManagerResponse registerNodeManager(
787         RegisterNodeManagerRequest request) throws YarnException, IOException,
788         IOException {
789       if (System.currentTimeMillis() - waitStartTime <= rmStartIntervalMS
790           || rmNeverStart) {
791         throw new java.net.ConnectException("Faking RM start failure as start "
792             + "delay timer has not expired.");
793       } else {
794         NodeId nodeId = request.getNodeId();
795         Resource resource = request.getResource();
796         LOG.info("Registering " + nodeId.toString());
797         // NOTE: this really should be checking against the config value
798         InetSocketAddress expected = NetUtils.getConnectAddress(
799             conf.getSocketAddr(YarnConfiguration.NM_ADDRESS, null, -1));
800         Assert.assertEquals(NetUtils.getHostPortString(expected),
801             nodeId.toString());
802         Assert.assertEquals(5 * 1024, resource.getMemory());
803         registeredNodes.add(nodeId);
804 
805         RegisterNodeManagerResponse response = recordFactory
806             .newRecordInstance(RegisterNodeManagerResponse.class);
807         triggered = true;
808         return response;
809       }
810     }
811 
812     @Override
nodeHeartbeat(NodeHeartbeatRequest request)813     public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
814         throws YarnException, IOException {
815       NodeStatus nodeStatus = request.getNodeStatus();
816       nodeStatus.setResponseId(heartBeatID++);
817 
818       NodeHeartbeatResponse nhResponse = YarnServerBuilderUtils.
819           newNodeHeartbeatResponse(heartBeatID, NodeAction.NORMAL, null,
820               null, null, null, 1000L);
821       return nhResponse;
822     }
823   }
824 
825   @Before
clearError()826   public void clearError() {
827     nmStartError = null;
828   }
829 
830   @After
deleteBaseDir()831   public void deleteBaseDir() throws IOException {
832     FileContext lfs = FileContext.getLocalFSFileContext();
833     lfs.delete(new Path(basedir.getPath()), true);
834   }
835 
836   @Test(timeout = 90000)
testRecentlyFinishedContainers()837   public void testRecentlyFinishedContainers() throws Exception {
838     NodeManager nm = new NodeManager();
839     YarnConfiguration conf = new YarnConfiguration();
840     conf.set(
841         NodeStatusUpdaterImpl.YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
842         "10000");
843     nm.init(conf);
844     NodeStatusUpdaterImpl nodeStatusUpdater =
845         (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
846     ApplicationId appId = ApplicationId.newInstance(0, 0);
847     ApplicationAttemptId appAttemptId =
848         ApplicationAttemptId.newInstance(appId, 0);
849     ContainerId cId = ContainerId.newContainerId(appAttemptId, 0);
850     nm.getNMContext().getApplications().putIfAbsent(appId,
851         mock(Application.class));
852     nm.getNMContext().getContainers().putIfAbsent(cId, mock(Container.class));
853 
854     nodeStatusUpdater.addCompletedContainer(cId);
855     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(cId));
856 
857     nm.getNMContext().getContainers().remove(cId);
858     long time1 = System.currentTimeMillis();
859     int waitInterval = 15;
860     while (waitInterval-- > 0
861         && nodeStatusUpdater.isContainerRecentlyStopped(cId)) {
862       nodeStatusUpdater.removeVeryOldStoppedContainersFromCache();
863       Thread.sleep(1000);
864     }
865     long time2 = System.currentTimeMillis();
866     // By this time the container will be removed from cache. need to verify.
867     Assert.assertFalse(nodeStatusUpdater.isContainerRecentlyStopped(cId));
868     Assert.assertTrue((time2 - time1) >= 10000 && (time2 - time1) <= 250000);
869   }
870 
871   @Test(timeout = 90000)
testRemovePreviousCompletedContainersFromContext()872   public void testRemovePreviousCompletedContainersFromContext() throws Exception {
873     NodeManager nm = new NodeManager();
874     YarnConfiguration conf = new YarnConfiguration();
875     conf.set(
876         NodeStatusUpdaterImpl
877             .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
878         "10000");
879     nm.init(conf);
880     NodeStatusUpdaterImpl nodeStatusUpdater =
881         (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
882     ApplicationId appId = ApplicationId.newInstance(0, 0);
883     ApplicationAttemptId appAttemptId =
884         ApplicationAttemptId.newInstance(appId, 0);
885 
886     ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
887     Token containerToken =
888         BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
889             BuilderUtils.newResource(1024, 1), 0, 123,
890             "password".getBytes(), 0);
891     Container anyCompletedContainer = new ContainerImpl(conf, null,
892         null, null, null, null,
893         BuilderUtils.newContainerTokenIdentifier(containerToken)) {
894 
895       @Override
896       public ContainerState getCurrentState() {
897         return ContainerState.COMPLETE;
898       }
899 
900       @Override
901       public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
902         return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE;
903       }
904     };
905 
906     ContainerId runningContainerId =
907         ContainerId.newContainerId(appAttemptId, 3);
908     Token runningContainerToken =
909         BuilderUtils.newContainerToken(runningContainerId, "anyHost",
910           1234, "anyUser", BuilderUtils.newResource(1024, 1), 0, 123,
911           "password".getBytes(), 0);
912     Container runningContainer =
913         new ContainerImpl(conf, null, null, null, null, null,
914           BuilderUtils.newContainerTokenIdentifier(runningContainerToken)) {
915           @Override
916           public ContainerState getCurrentState() {
917             return ContainerState.RUNNING;
918           }
919 
920           @Override
921           public org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState getContainerState() {
922             return org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING;
923           }
924         };
925 
926     nm.getNMContext().getApplications().putIfAbsent(appId,
927         mock(Application.class));
928     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
929     nm.getNMContext().getContainers()
930       .put(runningContainerId, runningContainer);
931 
932     Assert.assertEquals(2, nodeStatusUpdater.getContainerStatuses().size());
933 
934     List<ContainerId> ackedContainers = new ArrayList<ContainerId>();
935     ackedContainers.add(cId);
936     ackedContainers.add(runningContainerId);
937 
938     nodeStatusUpdater.removeOrTrackCompletedContainersFromContext(ackedContainers);
939 
940     Set<ContainerId> containerIdSet = new HashSet<ContainerId>();
941     List<ContainerStatus> containerStatuses = nodeStatusUpdater.getContainerStatuses();
942     for (ContainerStatus status : containerStatuses) {
943       containerIdSet.add(status.getContainerId());
944     }
945 
946     Assert.assertEquals(1, containerStatuses.size());
947     // completed container is removed;
948     Assert.assertFalse(containerIdSet.contains(cId));
949     // running container is not removed;
950     Assert.assertTrue(containerIdSet.contains(runningContainerId));
951   }
952 
953   @Test(timeout = 10000)
testCompletedContainersIsRecentlyStopped()954   public void testCompletedContainersIsRecentlyStopped() throws Exception {
955     NodeManager nm = new NodeManager();
956     nm.init(conf);
957     NodeStatusUpdaterImpl nodeStatusUpdater =
958         (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
959     ApplicationId appId = ApplicationId.newInstance(0, 0);
960     Application completedApp = mock(Application.class);
961     when(completedApp.getApplicationState()).thenReturn(
962         ApplicationState.FINISHED);
963     ApplicationAttemptId appAttemptId =
964         ApplicationAttemptId.newInstance(appId, 0);
965     ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
966     Token containerToken =
967         BuilderUtils.newContainerToken(containerId, "host", 1234, "user",
968             BuilderUtils.newResource(1024, 1), 0, 123,
969             "password".getBytes(), 0);
970     Container completedContainer = new ContainerImpl(conf, null,
971         null, null, null, null,
972         BuilderUtils.newContainerTokenIdentifier(containerToken)) {
973       @Override
974       public ContainerState getCurrentState() {
975         return ContainerState.COMPLETE;
976       }
977     };
978 
979     nm.getNMContext().getApplications().putIfAbsent(appId, completedApp);
980     nm.getNMContext().getContainers().put(containerId, completedContainer);
981 
982     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
983     Assert.assertTrue(nodeStatusUpdater.isContainerRecentlyStopped(
984         containerId));
985   }
986 
987   @Test
testCleanedupApplicationContainerCleanup()988   public void testCleanedupApplicationContainerCleanup() throws IOException {
989     NodeManager nm = new NodeManager();
990     YarnConfiguration conf = new YarnConfiguration();
991     conf.set(NodeStatusUpdaterImpl
992             .YARN_NODEMANAGER_DURATION_TO_TRACK_STOPPED_CONTAINERS,
993         "1000000");
994     nm.init(conf);
995 
996     NodeStatusUpdaterImpl nodeStatusUpdater =
997         (NodeStatusUpdaterImpl) nm.getNodeStatusUpdater();
998     ApplicationId appId = ApplicationId.newInstance(0, 0);
999     ApplicationAttemptId appAttemptId =
1000         ApplicationAttemptId.newInstance(appId, 0);
1001 
1002     ContainerId cId = ContainerId.newContainerId(appAttemptId, 1);
1003     Token containerToken =
1004         BuilderUtils.newContainerToken(cId, "anyHost", 1234, "anyUser",
1005             BuilderUtils.newResource(1024, 1), 0, 123,
1006             "password".getBytes(), 0);
1007     Container anyCompletedContainer = new ContainerImpl(conf, null,
1008         null, null, null, null,
1009         BuilderUtils.newContainerTokenIdentifier(containerToken)) {
1010 
1011       @Override
1012       public ContainerState getCurrentState() {
1013         return ContainerState.COMPLETE;
1014       }
1015     };
1016 
1017     Application application = mock(Application.class);
1018     when(application.getApplicationState()).thenReturn(ApplicationState.RUNNING);
1019     nm.getNMContext().getApplications().putIfAbsent(appId, application);
1020     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
1021 
1022     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
1023 
1024     when(application.getApplicationState()).thenReturn(
1025         ApplicationState.FINISHING_CONTAINERS_WAIT);
1026     // The completed container will be saved in case of lost heartbeat.
1027     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
1028     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
1029 
1030     nm.getNMContext().getContainers().put(cId, anyCompletedContainer);
1031     nm.getNMContext().getApplications().remove(appId);
1032     // The completed container will be saved in case of lost heartbeat.
1033     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
1034     Assert.assertEquals(1, nodeStatusUpdater.getContainerStatuses().size());
1035   }
1036 
1037   @Test
testNMRegistration()1038   public void testNMRegistration() throws InterruptedException {
1039     nm = new NodeManager() {
1040       @Override
1041       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1042           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1043         return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
1044                                        metrics);
1045       }
1046     };
1047 
1048     YarnConfiguration conf = createNMConfig();
1049     nm.init(conf);
1050 
1051     // verify that the last service is the nodeStatusUpdater (ie registration
1052     // with RM)
1053     Object[] services  = nm.getServices().toArray();
1054     Object lastService = services[services.length-1];
1055     Assert.assertTrue("last service is NOT the node status updater",
1056         lastService instanceof NodeStatusUpdater);
1057 
1058     new Thread() {
1059       public void run() {
1060         try {
1061           nm.start();
1062         } catch (Throwable e) {
1063           TestNodeStatusUpdater.this.nmStartError = e;
1064           throw new YarnRuntimeException(e);
1065         }
1066       }
1067     }.start();
1068 
1069     System.out.println(" ----- thread already started.."
1070         + nm.getServiceState());
1071 
1072     int waitCount = 0;
1073     while (nm.getServiceState() == STATE.INITED && waitCount++ != 50) {
1074       LOG.info("Waiting for NM to start..");
1075       if (nmStartError != null) {
1076         LOG.error("Error during startup. ", nmStartError);
1077         Assert.fail(nmStartError.getCause().getMessage());
1078       }
1079       Thread.sleep(2000);
1080     }
1081     if (nm.getServiceState() != STATE.STARTED) {
1082       // NM could have failed.
1083       Assert.fail("NodeManager failed to start");
1084     }
1085 
1086     waitCount = 0;
1087     while (heartBeatID <= 3 && waitCount++ != 200) {
1088       Thread.sleep(1000);
1089     }
1090     Assert.assertFalse(heartBeatID <= 3);
1091     Assert.assertEquals("Number of registered NMs is wrong!!", 1,
1092         this.registeredNodes.size());
1093 
1094     nm.stop();
1095   }
1096 
1097   @Test
testStopReentrant()1098   public void testStopReentrant() throws Exception {
1099     final AtomicInteger numCleanups = new AtomicInteger(0);
1100     nm = new NodeManager() {
1101       @Override
1102       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1103           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1104         MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
1105             context, dispatcher, healthChecker, metrics);
1106         MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
1107         myResourceTracker2.heartBeatNodeAction = NodeAction.SHUTDOWN;
1108         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
1109         return myNodeStatusUpdater;
1110       }
1111 
1112       @Override
1113       protected ContainerManagerImpl createContainerManager(Context context,
1114           ContainerExecutor exec, DeletionService del,
1115           NodeStatusUpdater nodeStatusUpdater,
1116           ApplicationACLsManager aclsManager,
1117           LocalDirsHandlerService dirsHandler) {
1118         return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
1119             metrics, aclsManager, dirsHandler) {
1120 
1121           @Override
1122           public void cleanUpApplicationsOnNMShutDown() {
1123             super.cleanUpApplicationsOnNMShutDown();
1124             numCleanups.incrementAndGet();
1125           }
1126         };
1127       }
1128     };
1129 
1130     YarnConfiguration conf = createNMConfig();
1131     nm.init(conf);
1132     nm.start();
1133 
1134     int waitCount = 0;
1135     while (heartBeatID < 1 && waitCount++ != 200) {
1136       Thread.sleep(500);
1137     }
1138     Assert.assertFalse(heartBeatID < 1);
1139 
1140     // Meanwhile call stop directly as the shutdown hook would
1141     nm.stop();
1142 
1143     // NM takes a while to reach the STOPPED state.
1144     waitCount = 0;
1145     while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
1146       LOG.info("Waiting for NM to stop..");
1147       Thread.sleep(1000);
1148     }
1149 
1150     Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
1151     Assert.assertEquals(numCleanups.get(), 1);
1152   }
1153 
1154   @Test
1155   public void testNodeDecommision() throws Exception {
1156     nm = getNodeManager(NodeAction.SHUTDOWN);
1157     YarnConfiguration conf = createNMConfig();
1158     nm.init(conf);
1159     Assert.assertEquals(STATE.INITED, nm.getServiceState());
1160     nm.start();
1161 
1162     int waitCount = 0;
1163     while (heartBeatID < 1 && waitCount++ != 200) {
1164       Thread.sleep(500);
1165     }
1166     Assert.assertFalse(heartBeatID < 1);
1167     Assert.assertTrue(nm.getNMContext().getDecommissioned());
1168 
1169     // NM takes a while to reach the STOPPED state.
1170     waitCount = 0;
1171     while (nm.getServiceState() != STATE.STOPPED && waitCount++ != 20) {
1172       LOG.info("Waiting for NM to stop..");
1173       Thread.sleep(1000);
1174     }
1175 
1176     Assert.assertEquals(STATE.STOPPED, nm.getServiceState());
1177   }
1178 
1179   private abstract class NodeManagerWithCustomNodeStatusUpdater extends NodeManager {
1180     private NodeStatusUpdater updater;
1181 
1182     private NodeManagerWithCustomNodeStatusUpdater() {
1183     }
1184 
1185     @Override
1186     protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1187                                                         Dispatcher dispatcher,
1188                                                         NodeHealthCheckerService healthChecker) {
1189       updater = createUpdater(context, dispatcher, healthChecker);
1190       return updater;
1191     }
1192 
1193     public NodeStatusUpdater getUpdater() {
1194       return updater;
1195     }
1196 
1197     abstract NodeStatusUpdater createUpdater(Context context,
1198                                                        Dispatcher dispatcher,
1199                                                        NodeHealthCheckerService healthChecker);
1200   }
1201 
1202   @Test
1203   public void testNMShutdownForRegistrationFailure() throws Exception {
1204 
1205     nm = new NodeManagerWithCustomNodeStatusUpdater() {
1206       @Override
1207       protected NodeStatusUpdater createUpdater(Context context,
1208           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1209         MyNodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater(
1210             context, dispatcher, healthChecker, metrics);
1211         MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
1212         myResourceTracker2.registerNodeAction = NodeAction.SHUTDOWN;
1213         myResourceTracker2.shutDownMessage = "RM Shutting Down Node";
1214         nodeStatusUpdater.resourceTracker = myResourceTracker2;
1215         return nodeStatusUpdater;
1216       }
1217     };
1218     verifyNodeStartFailure(
1219           "Recieved SHUTDOWN signal from Resourcemanager ,"
1220         + "Registration of NodeManager failed, "
1221         + "Message from ResourceManager: RM Shutting Down Node");
1222   }
1223 
1224   @Test (timeout = 150000)
1225   public void testNMConnectionToRM() throws Exception {
1226     final long delta = 50000;
1227     final long connectionWaitMs = 5000;
1228     final long connectionRetryIntervalMs = 1000;
1229     //Waiting for rmStartIntervalMS, RM will be started
1230     final long rmStartIntervalMS = 2*1000;
1231     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
1232         connectionWaitMs);
1233     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
1234         connectionRetryIntervalMs);
1235 
1236     //Test NM try to connect to RM Several times, but finally fail
1237     NodeManagerWithCustomNodeStatusUpdater nmWithUpdater;
1238     nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
1239       @Override
1240       protected NodeStatusUpdater createUpdater(Context context,
1241           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1242         NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
1243             context, dispatcher, healthChecker, metrics,
1244             rmStartIntervalMS, true);
1245         return nodeStatusUpdater;
1246       }
1247     };
1248     nm.init(conf);
1249     long waitStartTime = System.currentTimeMillis();
1250     try {
1251       nm.start();
1252       Assert.fail("NM should have failed to start due to RM connect failure");
1253     } catch(Exception e) {
1254       long t = System.currentTimeMillis();
1255       long duration = t - waitStartTime;
1256       boolean waitTimeValid = (duration >= connectionWaitMs)
1257               && (duration < (connectionWaitMs + delta));
1258       if(!waitTimeValid) {
1259         //either the exception was too early, or it had a different cause.
1260         //reject with the inner stack trace
1261         throw new Exception("NM should have tried re-connecting to RM during " +
1262           "period of at least " + connectionWaitMs + " ms, but " +
1263           "stopped retrying within " + (connectionWaitMs + delta) +
1264           " ms: " + e, e);
1265       }
1266     }
1267 
1268     //Test NM connect to RM, fail at first several attempts,
1269     //but finally success.
1270     nm = nmWithUpdater = new NodeManagerWithCustomNodeStatusUpdater() {
1271       @Override
1272       protected NodeStatusUpdater createUpdater(Context context,
1273           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1274         NodeStatusUpdater nodeStatusUpdater = new MyNodeStatusUpdater4(
1275             context, dispatcher, healthChecker, metrics, rmStartIntervalMS,
1276             false);
1277         return nodeStatusUpdater;
1278       }
1279     };
1280     nm.init(conf);
1281     NodeStatusUpdater updater = nmWithUpdater.getUpdater();
1282     Assert.assertNotNull("Updater not yet created ", updater);
1283     waitStartTime = System.currentTimeMillis();
1284     try {
1285       nm.start();
1286     } catch (Exception ex){
1287       LOG.error("NM should have started successfully " +
1288                 "after connecting to RM.", ex);
1289       throw ex;
1290     }
1291     long duration = System.currentTimeMillis() - waitStartTime;
1292     MyNodeStatusUpdater4 myUpdater = (MyNodeStatusUpdater4) updater;
1293     Assert.assertTrue("NM started before updater triggered",
1294                       myUpdater.isTriggered());
1295     Assert.assertTrue("NM should have connected to RM after "
1296         +"the start interval of " + rmStartIntervalMS
1297         +": actual " + duration
1298         + " " + myUpdater,
1299         (duration >= rmStartIntervalMS));
1300     Assert.assertTrue("NM should have connected to RM less than "
1301         + (rmStartIntervalMS + delta)
1302         +" milliseconds of RM starting up: actual " + duration
1303         + " " + myUpdater,
1304         (duration < (rmStartIntervalMS + delta)));
1305   }
1306 
1307   /**
1308    * Verifies that if for some reason NM fails to start ContainerManager RPC
1309    * server, RM is oblivious to NM's presence. The behaviour is like this
1310    * because otherwise, NM will report to RM even if all its servers are not
1311    * started properly, RM will think that the NM is alive and will retire the NM
1312    * only after NM_EXPIRY interval. See MAPREDUCE-2749.
1313    */
1314   @Test
1315   public void testNoRegistrationWhenNMServicesFail() throws Exception {
1316 
1317     nm = new NodeManager() {
1318       @Override
1319       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1320           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1321         return new MyNodeStatusUpdater(context, dispatcher, healthChecker,
1322                                        metrics);
1323       }
1324 
1325       @Override
1326       protected ContainerManagerImpl createContainerManager(Context context,
1327           ContainerExecutor exec, DeletionService del,
1328           NodeStatusUpdater nodeStatusUpdater,
1329           ApplicationACLsManager aclsManager,
1330           LocalDirsHandlerService diskhandler) {
1331         return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
1332           metrics, aclsManager, diskhandler) {
1333           @Override
1334           protected void serviceStart() {
1335             // Simulating failure of starting RPC server
1336             throw new YarnRuntimeException("Starting of RPC Server failed");
1337           }
1338         };
1339       }
1340     };
1341 
1342     verifyNodeStartFailure("Starting of RPC Server failed");
1343   }
1344 
1345   @Test
1346   public void testApplicationKeepAlive() throws Exception {
1347     MyNodeManager nm = new MyNodeManager();
1348     try {
1349       YarnConfiguration conf = createNMConfig();
1350       conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
1351       conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
1352           4000l);
1353       nm.init(conf);
1354       nm.start();
1355       // HB 2 -> app cancelled by RM.
1356       while (heartBeatID < 12) {
1357         Thread.sleep(1000l);
1358       }
1359       MyResourceTracker3 rt =
1360           (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
1361       rt.context.getApplications().remove(rt.appId);
1362       Assert.assertEquals(1, rt.keepAliveRequests.size());
1363       int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
1364       LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
1365       Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
1366       while (heartBeatID < 20) {
1367         Thread.sleep(1000l);
1368       }
1369       int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
1370       Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
1371     } finally {
1372       if (nm.getServiceState() == STATE.STARTED)
1373         nm.stop();
1374     }
1375   }
1376 
1377   /**
1378    * Test completed containerStatus get back up when heart beat lost, and will
1379    * be sent via next heart beat.
1380    */
1381   @Test(timeout = 200000)
1382   public void testCompletedContainerStatusBackup() throws Exception {
1383     nm = new NodeManager() {
1384       @Override
1385       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1386           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1387         MyNodeStatusUpdater2 myNodeStatusUpdater =
1388             new MyNodeStatusUpdater2(context, dispatcher, healthChecker,
1389                 metrics);
1390         return myNodeStatusUpdater;
1391       }
1392 
1393       @Override
1394       protected NMContext createNMContext(
1395           NMContainerTokenSecretManager containerTokenSecretManager,
1396           NMTokenSecretManagerInNM nmTokenSecretManager,
1397           NMStateStoreService store) {
1398         return new MyNMContext(containerTokenSecretManager,
1399           nmTokenSecretManager);
1400       }
1401     };
1402 
1403     YarnConfiguration conf = createNMConfig();
1404     nm.init(conf);
1405     nm.start();
1406 
1407     int waitCount = 0;
1408     while (heartBeatID <= 4 && waitCount++ != 20) {
1409       Thread.sleep(500);
1410     }
1411     if (heartBeatID <= 4) {
1412       Assert.fail("Failed to get all heartbeats in time, " +
1413           "heartbeatID:" + heartBeatID);
1414     }
1415     if(assertionFailedInThread.get()) {
1416       Assert.fail("ContainerStatus Backup failed");
1417     }
1418     Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
1419       .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
1420     nm.stop();
1421   }
1422 
1423   @Test(timeout = 200000)
1424   public void testNodeStatusUpdaterRetryAndNMShutdown()
1425       throws Exception {
1426     final long connectionWaitSecs = 1000;
1427     final long connectionRetryIntervalMs = 1000;
1428     YarnConfiguration conf = createNMConfig();
1429     conf.setLong(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS,
1430         connectionWaitSecs);
1431     conf.setLong(YarnConfiguration
1432             .RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS,
1433         connectionRetryIntervalMs);
1434     conf.setLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS, 5000);
1435     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
1436     CyclicBarrier syncBarrier = new CyclicBarrier(2);
1437     nm = new MyNodeManager2(syncBarrier, conf);
1438     nm.init(conf);
1439     nm.start();
1440     // start a container
1441     ContainerId cId = TestNodeManagerShutdown.createContainerId();
1442     FileContext localFS = FileContext.getLocalFSFileContext();
1443     TestNodeManagerShutdown.startContainer(nm, cId, localFS, nmLocalDir,
1444       new File("start_file.txt"));
1445 
1446     try {
1447       syncBarrier.await(10000, TimeUnit.MILLISECONDS);
1448     } catch (Exception e) {
1449     }
1450     Assert.assertFalse("Containers not cleaned up when NM stopped",
1451       assertionFailedInThread.get());
1452     Assert.assertTrue(((MyNodeManager2) nm).isStopped);
1453     Assert.assertTrue("calculate heartBeatCount based on" +
1454         " connectionWaitSecs and RetryIntervalSecs", heartBeatID == 2);
1455   }
1456 
1457   @Test
1458   public void testRMVersionLessThanMinimum() throws InterruptedException {
1459     final AtomicInteger numCleanups = new AtomicInteger(0);
1460     YarnConfiguration conf = createNMConfig();
1461     conf.set(YarnConfiguration.NM_RESOURCEMANAGER_MINIMUM_VERSION, "3.0.0");
1462     nm = new NodeManager() {
1463       @Override
1464       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1465                                                           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1466         MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
1467             context, dispatcher, healthChecker, metrics);
1468         MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
1469         myResourceTracker2.heartBeatNodeAction = NodeAction.NORMAL;
1470         myResourceTracker2.rmVersion = "3.0.0";
1471         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
1472         return myNodeStatusUpdater;
1473       }
1474 
1475       @Override
1476       protected ContainerManagerImpl createContainerManager(Context context,
1477           ContainerExecutor exec, DeletionService del,
1478           NodeStatusUpdater nodeStatusUpdater,
1479           ApplicationACLsManager aclsManager,
1480           LocalDirsHandlerService dirsHandler) {
1481         return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
1482             metrics, aclsManager, dirsHandler) {
1483 
1484           @Override
1485           public void cleanUpApplicationsOnNMShutDown() {
1486             super.cleanUpApplicationsOnNMShutDown();
1487             numCleanups.incrementAndGet();
1488           }
1489         };
1490       }
1491     };
1492 
1493     nm.init(conf);
1494     nm.start();
1495 
1496     // NM takes a while to reach the STARTED state.
1497     int waitCount = 0;
1498     while (nm.getServiceState() != STATE.STARTED && waitCount++ != 20) {
1499       LOG.info("Waiting for NM to stop..");
1500       Thread.sleep(1000);
1501     }
1502     Assert.assertTrue(nm.getServiceState() == STATE.STARTED);
1503     nm.stop();
1504   }
1505 
1506   @Test
1507   public void testConcurrentAccessToSystemCredentials(){
1508     final Map<ApplicationId, ByteBuffer> testCredentials = new HashMap<>();
1509     ByteBuffer byteBuffer = ByteBuffer.wrap(new byte[300]);
1510     ApplicationId applicationId = ApplicationId.newInstance(123456, 120);
1511     testCredentials.put(applicationId, byteBuffer);
1512 
1513     final List<Throwable> exceptions = Collections.synchronizedList(new
1514         ArrayList<Throwable>());
1515 
1516     final int NUM_THREADS = 10;
1517     final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
1518     final ExecutorService threadPool = Executors.newFixedThreadPool(
1519         NUM_THREADS);
1520 
1521     final AtomicBoolean stop = new AtomicBoolean(false);
1522 
1523     try {
1524       for (int i = 0; i < NUM_THREADS; i++) {
1525         threadPool.submit(new Runnable() {
1526           @Override
1527           public void run() {
1528             try {
1529               for (int i = 0; i < 100 && !stop.get(); i++) {
1530                 NodeHeartbeatResponse nodeHeartBeatResponse =
1531                     newNodeHeartbeatResponse(0, NodeAction.NORMAL,
1532                         null, null, null, null, 0);
1533                 nodeHeartBeatResponse.setSystemCredentialsForApps(
1534                     testCredentials);
1535                 NodeHeartbeatResponseProto proto =
1536                     ((NodeHeartbeatResponsePBImpl)nodeHeartBeatResponse)
1537                         .getProto();
1538                 Assert.assertNotNull(proto);
1539               }
1540             } catch (Throwable t) {
1541               exceptions.add(t);
1542               stop.set(true);
1543             } finally {
1544               allDone.countDown();
1545             }
1546           }
1547         });
1548       }
1549 
1550       int testTimeout = 2;
1551       Assert.assertTrue("Timeout waiting for more than " + testTimeout + " " +
1552               "seconds",
1553           allDone.await(testTimeout, TimeUnit.SECONDS));
1554     } catch (InterruptedException ie) {
1555       exceptions.add(ie);
1556     } finally {
1557       threadPool.shutdownNow();
1558     }
1559     Assert.assertTrue("Test failed with exception(s)" + exceptions,
1560         exceptions.isEmpty());
1561   }
1562 
1563   // Add new containers info into NM context each time node heart beats.
1564   private class MyNMContext extends NMContext {
1565 
1566     public MyNMContext(
1567         NMContainerTokenSecretManager containerTokenSecretManager,
1568         NMTokenSecretManagerInNM nmTokenSecretManager) {
1569       super(containerTokenSecretManager, nmTokenSecretManager, null, null,
1570           new NMNullStateStoreService());
1571     }
1572 
1573     @Override
1574     public ConcurrentMap<ContainerId, Container> getContainers() {
1575       if (heartBeatID == 0) {
1576         return containers;
1577       } else if (heartBeatID == 1) {
1578         ContainerStatus containerStatus2 =
1579             createContainerStatus(2, ContainerState.RUNNING);
1580         putMockContainer(containerStatus2);
1581 
1582         ContainerStatus containerStatus3 =
1583             createContainerStatus(3, ContainerState.COMPLETE);
1584         putMockContainer(containerStatus3);
1585         return containers;
1586       } else if (heartBeatID == 2) {
1587         ContainerStatus containerStatus4 =
1588             createContainerStatus(4, ContainerState.RUNNING);
1589         putMockContainer(containerStatus4);
1590 
1591         ContainerStatus containerStatus5 =
1592             createContainerStatus(5, ContainerState.COMPLETE);
1593         putMockContainer(containerStatus5);
1594         return containers;
1595       } else if (heartBeatID == 3 || heartBeatID == 4) {
1596         return containers;
1597       } else {
1598         containers.clear();
1599         return containers;
1600       }
1601     }
1602 
1603     private void putMockContainer(ContainerStatus containerStatus) {
1604       Container container = getMockContainer(containerStatus);
1605       containers.put(containerStatus.getContainerId(), container);
1606       applications.putIfAbsent(containerStatus.getContainerId()
1607           .getApplicationAttemptId().getApplicationId(),
1608           mock(Application.class));
1609     }
1610   }
1611 
1612   public static ContainerStatus createContainerStatus(int id,
1613       ContainerState containerState) {
1614     ApplicationId applicationId = ApplicationId.newInstance(0, 1);
1615     ApplicationAttemptId applicationAttemptId =
1616         ApplicationAttemptId.newInstance(applicationId, 1);
1617     ContainerId contaierId = ContainerId.newContainerId(applicationAttemptId, id);
1618     ContainerStatus containerStatus =
1619         BuilderUtils.newContainerStatus(contaierId, containerState,
1620           "test_containerStatus: id=" + id + ", containerState: "
1621               + containerState, 0);
1622     return containerStatus;
1623   }
1624 
1625   public static Container getMockContainer(ContainerStatus containerStatus) {
1626     ContainerImpl container = mock(ContainerImpl.class);
1627     when(container.cloneAndGetContainerStatus()).thenReturn(containerStatus);
1628     when(container.getCurrentState()).thenReturn(containerStatus.getState());
1629     when(container.getContainerId()).thenReturn(
1630       containerStatus.getContainerId());
1631     if (containerStatus.getState().equals(ContainerState.COMPLETE)) {
1632       when(container.getContainerState())
1633         .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.DONE);
1634     } else if (containerStatus.getState().equals(ContainerState.RUNNING)) {
1635       when(container.getContainerState())
1636       .thenReturn(org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState.RUNNING);
1637     }
1638     return container;
1639   }
1640 
1641   private void verifyNodeStartFailure(String errMessage) throws Exception {
1642     Assert.assertNotNull("nm is null", nm);
1643     YarnConfiguration conf = createNMConfig();
1644     nm.init(conf);
1645     try {
1646       nm.start();
1647       Assert.fail("NM should have failed to start. Didn't get exception!!");
1648     } catch (Exception e) {
1649       //the version in trunk looked in the cause for equality
1650       // and assumed failures were nested.
1651       //this version assumes that error strings propagate to the base and
1652       //use a contains() test only. It should be less brittle
1653       if(!e.getMessage().contains(errMessage)) {
1654         throw e;
1655       }
1656     }
1657 
1658     // the service should be stopped
1659     Assert.assertEquals("NM state is wrong!", STATE.STOPPED, nm
1660         .getServiceState());
1661 
1662     Assert.assertEquals("Number of registered nodes is wrong!", 0,
1663         this.registeredNodes.size());
1664   }
1665 
1666   private YarnConfiguration createNMConfig() {
1667     YarnConfiguration conf = new YarnConfiguration();
1668     String localhostAddress = null;
1669     try {
1670       localhostAddress = InetAddress.getByName("localhost").getCanonicalHostName();
1671     } catch (UnknownHostException e) {
1672       Assert.fail("Unable to get localhost address: " + e.getMessage());
1673     }
1674     conf.setInt(YarnConfiguration.NM_PMEM_MB, 5 * 1024); // 5GB
1675     conf.set(YarnConfiguration.NM_ADDRESS, localhostAddress + ":12345");
1676     conf.set(YarnConfiguration.NM_LOCALIZER_ADDRESS, localhostAddress + ":12346");
1677     conf.set(YarnConfiguration.NM_LOG_DIRS, logsDir.getAbsolutePath());
1678     conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
1679       remoteLogsDir.getAbsolutePath());
1680     conf.set(YarnConfiguration.NM_LOCAL_DIRS, nmLocalDir.getAbsolutePath());
1681     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 1);
1682     return conf;
1683   }
1684 
1685   private NodeManager getNodeManager(final NodeAction nodeHeartBeatAction) {
1686     return new NodeManager() {
1687       @Override
1688       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
1689           Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
1690         MyNodeStatusUpdater myNodeStatusUpdater = new MyNodeStatusUpdater(
1691             context, dispatcher, healthChecker, metrics);
1692         MyResourceTracker2 myResourceTracker2 = new MyResourceTracker2();
1693         myResourceTracker2.heartBeatNodeAction = nodeHeartBeatAction;
1694         myNodeStatusUpdater.resourceTracker = myResourceTracker2;
1695         return myNodeStatusUpdater;
1696       }
1697     };
1698   }
1699 }
1700