1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.yarn.server.timeline;
19 
20 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
21 import static org.junit.Assert.assertEquals;
22 import static org.junit.Assert.assertNotNull;
23 
24 import java.io.File;
25 import java.io.IOException;
26 import java.util.Collections;
27 import java.util.List;
28 import java.util.Map;
29 import java.util.Set;
30 
31 import org.apache.hadoop.classification.InterfaceAudience;
32 import org.apache.hadoop.classification.InterfaceStability;
33 import org.apache.hadoop.conf.Configuration;
34 import org.apache.hadoop.fs.FileContext;
35 import org.apache.hadoop.fs.FileStatus;
36 import org.apache.hadoop.fs.FileSystem;
37 import org.apache.hadoop.fs.Path;
38 import org.apache.hadoop.io.IOUtils;
39 import org.apache.hadoop.service.ServiceStateException;
40 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
41 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
42 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
43 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
44 import org.apache.hadoop.yarn.conf.YarnConfiguration;
45 import org.apache.hadoop.yarn.server.records.Version;
46 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore;
47 import org.apache.hadoop.yarn.server.timeline.NameValuePair;
48 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
49 import org.iq80.leveldb.DBException;
50 import org.junit.After;
51 import org.junit.Assert;
52 import org.junit.Before;
53 import org.junit.Test;
54 
55 @InterfaceAudience.Private
56 @InterfaceStability.Unstable
57 public class TestLeveldbTimelineStore extends TimelineStoreTestUtils {
58   private FileContext fsContext;
59   private File fsPath;
60   private Configuration config = new YarnConfiguration();
61 
62   @Before
setup()63   public void setup() throws Exception {
64     fsContext = FileContext.getLocalFSFileContext();
65     fsPath = new File("target", this.getClass().getSimpleName() +
66         "-tmpDir").getAbsoluteFile();
67     fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
68     config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH,
69         fsPath.getAbsolutePath());
70     config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false);
71     store = new LeveldbTimelineStore();
72     store.init(config);
73     store.start();
74     loadTestEntityData();
75     loadVerificationEntityData();
76     loadTestDomainData();
77   }
78 
79   @After
tearDown()80   public void tearDown() throws Exception {
81     store.stop();
82     fsContext.delete(new Path(fsPath.getAbsolutePath()), true);
83   }
84 
85   @Test
testRootDirPermission()86   public void testRootDirPermission() throws IOException {
87     FileSystem fs = FileSystem.getLocal(new YarnConfiguration());
88     FileStatus file = fs.getFileStatus(
89         new Path(fsPath.getAbsolutePath(), LeveldbTimelineStore.FILENAME));
90     assertNotNull(file);
91     assertEquals(LeveldbTimelineStore.LEVELDB_DIR_UMASK, file.getPermission());
92   }
93 
94   @Test
testGetSingleEntity()95   public void testGetSingleEntity() throws IOException {
96     super.testGetSingleEntity();
97     ((LeveldbTimelineStore)store).clearStartTimeCache();
98     super.testGetSingleEntity();
99     loadTestEntityData();
100   }
101 
102   @Test
testGetEntities()103   public void testGetEntities() throws IOException {
104     super.testGetEntities();
105   }
106 
107   @Test
testGetEntitiesWithFromId()108   public void testGetEntitiesWithFromId() throws IOException {
109     super.testGetEntitiesWithFromId();
110   }
111 
112   @Test
testGetEntitiesWithFromTs()113   public void testGetEntitiesWithFromTs() throws IOException {
114     super.testGetEntitiesWithFromTs();
115   }
116 
117   @Test
testGetEntitiesWithPrimaryFilters()118   public void testGetEntitiesWithPrimaryFilters() throws IOException {
119     super.testGetEntitiesWithPrimaryFilters();
120   }
121 
122   @Test
testGetEntitiesWithSecondaryFilters()123   public void testGetEntitiesWithSecondaryFilters() throws IOException {
124     super.testGetEntitiesWithSecondaryFilters();
125   }
126 
127   @Test
testGetEvents()128   public void testGetEvents() throws IOException {
129     super.testGetEvents();
130   }
131 
132   @Test
testCacheSizes()133   public void testCacheSizes() {
134     Configuration conf = new Configuration();
135     assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
136     assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
137     conf.setInt(
138         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
139         10001);
140     assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf));
141     conf = new Configuration();
142     conf.setInt(
143         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
144         10002);
145     assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf));
146   }
147 
deleteNextEntity(String entityType, byte[] ts)148   private boolean deleteNextEntity(String entityType, byte[] ts)
149       throws IOException, InterruptedException {
150     LeveldbIterator iterator = null;
151     LeveldbIterator pfIterator = null;
152     try {
153       iterator = ((LeveldbTimelineStore)store).getDbIterator(false);
154       pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false);
155       return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts,
156           iterator, pfIterator, false);
157     } catch(DBException e) {
158       throw new IOException(e);
159     } finally {
160       IOUtils.cleanup(null, iterator, pfIterator);
161     }
162   }
163 
164   @Test
testGetEntityTypes()165   public void testGetEntityTypes() throws IOException {
166     List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes();
167     assertEquals(7, entityTypes.size());
168     assertEquals("ACL_ENTITY_TYPE_1", entityTypes.get(0));
169     assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(1));
170     assertEquals(entityType1, entityTypes.get(2));
171     assertEquals(entityType2, entityTypes.get(3));
172     assertEquals(entityType4, entityTypes.get(4));
173     assertEquals(entityType5, entityTypes.get(5));
174   }
175 
176   @Test
testDeleteEntities()177   public void testDeleteEntities() throws IOException, InterruptedException {
178     assertEquals(3, getEntities("type_1").size());
179     assertEquals(1, getEntities("type_2").size());
180 
181     assertEquals(false, deleteNextEntity(entityType1,
182         writeReverseOrderedLong(60l)));
183     assertEquals(3, getEntities("type_1").size());
184     assertEquals(1, getEntities("type_2").size());
185 
186     assertEquals(true, deleteNextEntity(entityType1,
187         writeReverseOrderedLong(123l)));
188     List<TimelineEntity> entities = getEntities("type_2");
189     assertEquals(1, entities.size());
190     verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap(
191         entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS,
192         EMPTY_MAP, entities.get(0), domainId1);
193     entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
194     assertEquals(2, entities.size());
195     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
196         primaryFilters, otherInfo, entities.get(0), domainId1);
197     // can retrieve entities across domains
198     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
199         primaryFilters, otherInfo, entities.get(1), domainId2);
200 
201     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
202     assertEquals(2, getEntities("type_1").size());
203     assertEquals(0, getEntities("type_2").size());
204     assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size());
205 
206     ((LeveldbTimelineStore)store).discardOldEntities(123l);
207     assertEquals(0, getEntities("type_1").size());
208     assertEquals(0, getEntities("type_2").size());
209     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
210     assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
211   }
212 
213   @Test
testDeleteEntitiesPrimaryFilters()214   public void testDeleteEntitiesPrimaryFilters()
215       throws IOException, InterruptedException {
216     Map<String, Set<Object>> primaryFilter =
217         Collections.singletonMap("user", Collections.singleton(
218             (Object) "otheruser"));
219     TimelineEntities atsEntities = new TimelineEntities();
220     atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b,
221         entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter,
222         null, domainId1)));
223     TimelinePutResponse response = store.put(atsEntities);
224     assertEquals(0, response.getErrors().size());
225 
226     NameValuePair pfPair = new NameValuePair("user", "otheruser");
227     List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1",
228         pfPair);
229     assertEquals(1, entities.size());
230     verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2),
231         EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0),
232         domainId1);
233 
234     entities = getEntitiesWithPrimaryFilter("type_1", userFilter);
235     assertEquals(3, entities.size());
236     verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES,
237         primaryFilters, otherInfo, entities.get(0), domainId1);
238     verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES,
239         primaryFilters, otherInfo, entities.get(1), domainId1);
240     verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES,
241         primaryFilters, otherInfo, entities.get(2), domainId2);
242 
243     ((LeveldbTimelineStore)store).discardOldEntities(-123l);
244     assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
245     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
246 
247     ((LeveldbTimelineStore)store).discardOldEntities(123l);
248     assertEquals(0, getEntities("type_1").size());
249     assertEquals(0, getEntities("type_2").size());
250     assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size());
251 
252     assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size());
253     assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
254   }
255 
256   @Test
testFromTsWithDeletion()257   public void testFromTsWithDeletion()
258       throws IOException, InterruptedException {
259     long l = System.currentTimeMillis();
260     assertEquals(3, getEntitiesFromTs("type_1", l).size());
261     assertEquals(1, getEntitiesFromTs("type_2", l).size());
262     assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
263         l).size());
264     ((LeveldbTimelineStore)store).discardOldEntities(123l);
265     assertEquals(0, getEntitiesFromTs("type_1", l).size());
266     assertEquals(0, getEntitiesFromTs("type_2", l).size());
267     assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
268         l).size());
269     assertEquals(0, getEntities("type_1").size());
270     assertEquals(0, getEntities("type_2").size());
271     assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
272         l).size());
273     loadTestEntityData();
274     assertEquals(0, getEntitiesFromTs("type_1", l).size());
275     assertEquals(0, getEntitiesFromTs("type_2", l).size());
276     assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter,
277         l).size());
278     assertEquals(3, getEntities("type_1").size());
279     assertEquals(1, getEntities("type_2").size());
280     assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size());
281   }
282 
283   @Test
testCheckVersion()284   public void testCheckVersion() throws IOException {
285     LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store;
286     // default version
287     Version defaultVersion = dbStore.getCurrentVersion();
288     Assert.assertEquals(defaultVersion, dbStore.loadVersion());
289 
290     // compatible version
291     Version compatibleVersion =
292         Version.newInstance(defaultVersion.getMajorVersion(),
293           defaultVersion.getMinorVersion() + 2);
294     dbStore.storeVersion(compatibleVersion);
295     Assert.assertEquals(compatibleVersion, dbStore.loadVersion());
296     restartTimelineStore();
297     dbStore = (LeveldbTimelineStore) store;
298     // overwrite the compatible version
299     Assert.assertEquals(defaultVersion, dbStore.loadVersion());
300 
301     // incompatible version
302     Version incompatibleVersion =
303       Version.newInstance(defaultVersion.getMajorVersion() + 1,
304           defaultVersion.getMinorVersion());
305     dbStore.storeVersion(incompatibleVersion);
306     try {
307       restartTimelineStore();
308       Assert.fail("Incompatible version, should expect fail here.");
309     } catch (ServiceStateException e) {
310       Assert.assertTrue("Exception message mismatch",
311         e.getMessage().contains("Incompatible version for timeline store"));
312     }
313   }
314 
315   @Test
testValidateConfig()316   public void testValidateConfig() throws IOException {
317     Configuration copyConfig = new YarnConfiguration(config);
318     try {
319       Configuration newConfig = new YarnConfiguration(copyConfig);
320       newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0);
321       config = newConfig;
322       restartTimelineStore();
323       Assert.fail();
324     } catch (IllegalArgumentException e) {
325       Assert.assertTrue(e.getMessage().contains(
326           YarnConfiguration.TIMELINE_SERVICE_TTL_MS));
327     }
328     try {
329       Configuration newConfig = new YarnConfiguration(copyConfig);
330       newConfig.setLong(
331           YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0);
332       config = newConfig;
333       restartTimelineStore();
334       Assert.fail();
335     } catch (IllegalArgumentException e) {
336       Assert.assertTrue(e.getMessage().contains(
337           YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS));
338     }
339     try {
340       Configuration newConfig = new YarnConfiguration(copyConfig);
341       newConfig.setLong(
342           YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1);
343       config = newConfig;
344       restartTimelineStore();
345       Assert.fail();
346     } catch (IllegalArgumentException e) {
347       Assert.assertTrue(e.getMessage().contains(
348           YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
349     }
350     try {
351       Configuration newConfig = new YarnConfiguration(copyConfig);
352       newConfig
353           .setLong(
354               YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
355               0);
356       config = newConfig;
357       restartTimelineStore();
358       Assert.fail();
359     } catch (IllegalArgumentException e) {
360       Assert
361           .assertTrue(e
362               .getMessage().contains(
363                   YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE));
364     }
365     try {
366       Configuration newConfig = new YarnConfiguration(copyConfig);
367       newConfig
368           .setLong(
369               YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
370               0);
371       config = newConfig;
372       restartTimelineStore();
373       Assert.fail();
374     } catch (IllegalArgumentException e) {
375       Assert
376           .assertTrue(e
377               .getMessage()
378               .contains(
379                   YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE));
380     }
381     config = copyConfig;
382     restartTimelineStore();
383   }
384 
restartTimelineStore()385   private void restartTimelineStore() throws IOException {
386     // need to close so leveldb releases database lock
387     if (store != null) {
388       store.close();
389     }
390     store = new LeveldbTimelineStore();
391     store.init(config);
392     store.start();
393   }
394 
395   @Test
testGetDomain()396   public void testGetDomain() throws IOException {
397     super.testGetDomain();
398   }
399 
400   @Test
testGetDomains()401   public void testGetDomains() throws IOException {
402     super.testGetDomains();
403   }
404 
405   @Test
testRelatingToNonExistingEntity()406   public void testRelatingToNonExistingEntity() throws IOException {
407     TimelineEntity entityToStore = new TimelineEntity();
408     entityToStore.setEntityType("TEST_ENTITY_TYPE_1");
409     entityToStore.setEntityId("TEST_ENTITY_ID_1");
410     entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
411     entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2");
412     TimelineEntities entities = new TimelineEntities();
413     entities.addEntity(entityToStore);
414     store.put(entities);
415     TimelineEntity entityToGet =
416         store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null);
417     Assert.assertNotNull(entityToGet);
418     Assert.assertEquals("DEFAULT", entityToGet.getDomainId());
419     Assert.assertEquals("TEST_ENTITY_TYPE_1",
420         entityToGet.getRelatedEntities().keySet().iterator().next());
421     Assert.assertEquals("TEST_ENTITY_ID_1",
422         entityToGet.getRelatedEntities().values().iterator().next()
423             .iterator().next());
424   }
425 
426   @Test
testRelatingToOldEntityWithoutDomainId()427   public void testRelatingToOldEntityWithoutDomainId() throws IOException {
428     // New entity is put in the default domain
429     TimelineEntity entityToStore = new TimelineEntity();
430     entityToStore.setEntityType("NEW_ENTITY_TYPE_1");
431     entityToStore.setEntityId("NEW_ENTITY_ID_1");
432     entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID);
433     entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
434     TimelineEntities entities = new TimelineEntities();
435     entities.addEntity(entityToStore);
436     store.put(entities);
437 
438     TimelineEntity entityToGet =
439         store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
440     Assert.assertNotNull(entityToGet);
441     Assert.assertNull(entityToGet.getDomainId());
442     Assert.assertEquals("NEW_ENTITY_TYPE_1",
443         entityToGet.getRelatedEntities().keySet().iterator().next());
444     Assert.assertEquals("NEW_ENTITY_ID_1",
445         entityToGet.getRelatedEntities().values().iterator().next()
446             .iterator().next());
447 
448     // New entity is not put in the default domain
449     entityToStore = new TimelineEntity();
450     entityToStore.setEntityType("NEW_ENTITY_TYPE_2");
451     entityToStore.setEntityId("NEW_ENTITY_ID_2");
452     entityToStore.setDomainId("NON_DEFAULT");
453     entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1");
454     entities = new TimelineEntities();
455     entities.addEntity(entityToStore);
456     TimelinePutResponse response = store.put(entities);
457     Assert.assertEquals(1, response.getErrors().size());
458     Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION,
459         response.getErrors().get(0).getErrorCode());
460     entityToGet =
461         store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null);
462     Assert.assertNotNull(entityToGet);
463     Assert.assertNull(entityToGet.getDomainId());
464     // Still have one related entity
465     Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size());
466     Assert.assertEquals(1, entityToGet.getRelatedEntities().values()
467         .iterator().next().size());
468   }
469 
470 }
471