1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements.  See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership.  The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License.  You may obtain a copy of the License at
9 *
10 *     http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 
19 package org.apache.hadoop.yarn.client.api;
20 
21 import java.util.concurrent.ConcurrentHashMap;
22 
23 import org.apache.hadoop.classification.InterfaceAudience.Private;
24 import org.apache.hadoop.classification.InterfaceAudience.Public;
25 import org.apache.hadoop.classification.InterfaceStability.Evolving;
26 import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
27 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
28 import org.apache.hadoop.yarn.api.records.Token;
29 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
30 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
31 
32 import com.google.common.annotations.VisibleForTesting;
33 
34 /**
35  * NMTokenCache manages NMTokens required for an Application Master
36  * communicating with individual NodeManagers.
37  * <p>
38  * By default Yarn client libraries {@link AMRMClient} and {@link NMClient} use
39  * {@link #getSingleton()} instance of the cache.
40  * <ul>
41  *   <li>
42  *     Using the singleton instance of the cache is appropriate when running a
43  *     single ApplicationMaster in the same JVM.
44  *   </li>
45  *   <li>
46  *     When using the singleton, users don't need to do anything special,
47  *     {@link AMRMClient} and {@link NMClient} are already set up to use the
48  *     default singleton {@link NMTokenCache}
49  *     </li>
50  * </ul>
51  * If running multiple Application Masters in the same JVM, a different cache
52  * instance should be used for each Application Master.
53  * <ul>
54  *   <li>
55  *     If using the {@link AMRMClient} and the {@link NMClient}, setting up
56  *     and using an instance cache is as follows:
57  * <pre>
58  *   NMTokenCache nmTokenCache = new NMTokenCache();
59  *   AMRMClient rmClient = AMRMClient.createAMRMClient();
60  *   NMClient nmClient = NMClient.createNMClient();
61  *   nmClient.setNMTokenCache(nmTokenCache);
62  *   ...
63  * </pre>
64  *   </li>
65  *   <li>
66  *     If using the {@link AMRMClientAsync} and the {@link NMClientAsync},
67  *     setting up and using an instance cache is as follows:
68  * <pre>
69  *   NMTokenCache nmTokenCache = new NMTokenCache();
70  *   AMRMClient rmClient = AMRMClient.createAMRMClient();
71  *   NMClient nmClient = NMClient.createNMClient();
72  *   nmClient.setNMTokenCache(nmTokenCache);
73  *   AMRMClientAsync rmClientAsync = new AMRMClientAsync(rmClient, 1000, [AMRM_CALLBACK]);
74  *   NMClientAsync nmClientAsync = new NMClientAsync("nmClient", nmClient, [NM_CALLBACK]);
75  *   ...
76  * </pre>
77  *   </li>
78  *   <li>
79  *     If using {@link ApplicationMasterProtocol} and
80  *     {@link ContainerManagementProtocol} directly, setting up and using an
81  *     instance cache is as follows:
82  * <pre>
83  *   NMTokenCache nmTokenCache = new NMTokenCache();
84  *   ...
85  *   ApplicationMasterProtocol amPro = ClientRMProxy.createRMProxy(conf, ApplicationMasterProtocol.class);
86  *   ...
87  *   AllocateRequest allocateRequest = ...
88  *   ...
89  *   AllocateResponse allocateResponse = rmClient.allocate(allocateRequest);
90  *   for (NMToken token : allocateResponse.getNMTokens()) {
91  *     nmTokenCache.setToken(token.getNodeId().toString(), token.getToken());
92  *   }
93  *   ...
94  *   ContainerManagementProtocolProxy nmPro = ContainerManagementProtocolProxy(conf, nmTokenCache);
95  *   ...
96  *   nmPro.startContainer(container, containerContext);
97  *   ...
98  * </pre>
99  *   </li>
100  * </ul>
101  * It is also possible to mix the usage of a client ({@code AMRMClient} or
102  * {@code NMClient}, or the async versions of them) with a protocol proxy
103  * ({@code ContainerManagementProtocolProxy} or
104  * {@code ApplicationMasterProtocol}).
105  */
106 @Public
107 @Evolving
108 public class NMTokenCache {
109   private static final NMTokenCache NM_TOKEN_CACHE = new NMTokenCache();
110 
111   /**
112    * Returns the singleton NM token cache.
113    *
114    * @return the singleton NM token cache.
115    */
getSingleton()116   public static NMTokenCache getSingleton() {
117     return NM_TOKEN_CACHE;
118   }
119 
120   /**
121    * Returns NMToken, null if absent. Only the singleton obtained from
122    * {@link #getSingleton()} is looked at for the tokens. If you are using your
123    * own NMTokenCache that is different from the singleton, use
124    * {@link #getToken(String) }
125    *
126    * @param nodeAddr
127    * @return {@link Token} NMToken required for communicating with node manager
128    */
129   @Public
getNMToken(String nodeAddr)130   public static Token getNMToken(String nodeAddr) {
131     return NM_TOKEN_CACHE.getToken(nodeAddr);
132   }
133 
134   /**
135    * Sets the NMToken for node address only in the singleton obtained from
136    * {@link #getSingleton()}. If you are using your own NMTokenCache that is
137    * different from the singleton, use {@link #setToken(String, Token) }
138    *
139    * @param nodeAddr
140    *          node address (host:port)
141    * @param token
142    *          NMToken
143    */
144   @Public
setNMToken(String nodeAddr, Token token)145   public static void setNMToken(String nodeAddr, Token token) {
146     NM_TOKEN_CACHE.setToken(nodeAddr, token);
147   }
148 
149   private ConcurrentHashMap<String, Token> nmTokens;
150 
151   /**
152    * Creates a NM token cache instance.
153    */
NMTokenCache()154   public NMTokenCache() {
155     nmTokens = new ConcurrentHashMap<String, Token>();
156   }
157 
158   /**
159    * Returns NMToken, null if absent
160    * @param nodeAddr
161    * @return {@link Token} NMToken required for communicating with node
162    *         manager
163    */
164   @Public
165   @Evolving
getToken(String nodeAddr)166   public Token getToken(String nodeAddr) {
167     return nmTokens.get(nodeAddr);
168   }
169 
170   /**
171    * Sets the NMToken for node address
172    * @param nodeAddr node address (host:port)
173    * @param token NMToken
174    */
175   @Public
176   @Evolving
setToken(String nodeAddr, Token token)177   public void setToken(String nodeAddr, Token token) {
178     nmTokens.put(nodeAddr, token);
179   }
180 
181   /**
182    * Returns true if NMToken is present in cache.
183    */
184   @Private
185   @VisibleForTesting
containsToken(String nodeAddr)186   public boolean containsToken(String nodeAddr) {
187     return nmTokens.containsKey(nodeAddr);
188   }
189 
190   /**
191    * Returns the number of NMTokens present in cache.
192    */
193   @Private
194   @VisibleForTesting
numberOfTokensInCache()195   public int numberOfTokensInCache() {
196     return nmTokens.size();
197   }
198 
199   /**
200    * Removes NMToken for specified node manager
201    * @param nodeAddr node address (host:port)
202    */
203   @Private
204   @VisibleForTesting
removeToken(String nodeAddr)205   public void removeToken(String nodeAddr) {
206     nmTokens.remove(nodeAddr);
207   }
208 
209   /**
210    * It will remove all the nm tokens from its cache
211    */
212   @Private
213   @VisibleForTesting
clearCache()214   public void clearCache() {
215     nmTokens.clear();
216   }
217 }
218