1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 package org.apache.hadoop.yarn.server.timeline; 19 20 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; 21 import static org.junit.Assert.assertEquals; 22 import static org.junit.Assert.assertNotNull; 23 24 import java.io.File; 25 import java.io.IOException; 26 import java.util.Collections; 27 import java.util.List; 28 import java.util.Map; 29 import java.util.Set; 30 31 import org.apache.hadoop.classification.InterfaceAudience; 32 import org.apache.hadoop.classification.InterfaceStability; 33 import org.apache.hadoop.conf.Configuration; 34 import org.apache.hadoop.fs.FileContext; 35 import org.apache.hadoop.fs.FileStatus; 36 import org.apache.hadoop.fs.FileSystem; 37 import org.apache.hadoop.fs.Path; 38 import org.apache.hadoop.io.IOUtils; 39 import org.apache.hadoop.service.ServiceStateException; 40 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities; 41 import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity; 42 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse; 43 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; 44 import org.apache.hadoop.yarn.conf.YarnConfiguration; 45 import org.apache.hadoop.yarn.server.records.Version; 46 import org.apache.hadoop.yarn.server.timeline.LeveldbTimelineStore; 47 import org.apache.hadoop.yarn.server.timeline.NameValuePair; 48 import org.apache.hadoop.yarn.server.utils.LeveldbIterator; 49 import org.iq80.leveldb.DBException; 50 import org.junit.After; 51 import org.junit.Assert; 52 import org.junit.Before; 53 import org.junit.Test; 54 55 @InterfaceAudience.Private 56 @InterfaceStability.Unstable 57 public class TestLeveldbTimelineStore extends TimelineStoreTestUtils { 58 private FileContext fsContext; 59 private File fsPath; 60 private Configuration config = new YarnConfiguration(); 61 62 @Before setup()63 public void setup() throws Exception { 64 fsContext = FileContext.getLocalFSFileContext(); 65 fsPath = new File("target", this.getClass().getSimpleName() + 66 "-tmpDir").getAbsoluteFile(); 67 fsContext.delete(new Path(fsPath.getAbsolutePath()), true); 68 config.set(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH, 69 fsPath.getAbsolutePath()); 70 config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, false); 71 store = new LeveldbTimelineStore(); 72 store.init(config); 73 store.start(); 74 loadTestEntityData(); 75 loadVerificationEntityData(); 76 loadTestDomainData(); 77 } 78 79 @After tearDown()80 public void tearDown() throws Exception { 81 store.stop(); 82 fsContext.delete(new Path(fsPath.getAbsolutePath()), true); 83 } 84 85 @Test testRootDirPermission()86 public void testRootDirPermission() throws IOException { 87 FileSystem fs = FileSystem.getLocal(new YarnConfiguration()); 88 FileStatus file = fs.getFileStatus( 89 new Path(fsPath.getAbsolutePath(), LeveldbTimelineStore.FILENAME)); 90 assertNotNull(file); 91 assertEquals(LeveldbTimelineStore.LEVELDB_DIR_UMASK, file.getPermission()); 92 } 93 94 @Test testGetSingleEntity()95 public void testGetSingleEntity() throws IOException { 96 super.testGetSingleEntity(); 97 ((LeveldbTimelineStore)store).clearStartTimeCache(); 98 super.testGetSingleEntity(); 99 loadTestEntityData(); 100 } 101 102 @Test testGetEntities()103 public void testGetEntities() throws IOException { 104 super.testGetEntities(); 105 } 106 107 @Test testGetEntitiesWithFromId()108 public void testGetEntitiesWithFromId() throws IOException { 109 super.testGetEntitiesWithFromId(); 110 } 111 112 @Test testGetEntitiesWithFromTs()113 public void testGetEntitiesWithFromTs() throws IOException { 114 super.testGetEntitiesWithFromTs(); 115 } 116 117 @Test testGetEntitiesWithPrimaryFilters()118 public void testGetEntitiesWithPrimaryFilters() throws IOException { 119 super.testGetEntitiesWithPrimaryFilters(); 120 } 121 122 @Test testGetEntitiesWithSecondaryFilters()123 public void testGetEntitiesWithSecondaryFilters() throws IOException { 124 super.testGetEntitiesWithSecondaryFilters(); 125 } 126 127 @Test testGetEvents()128 public void testGetEvents() throws IOException { 129 super.testGetEvents(); 130 } 131 132 @Test testCacheSizes()133 public void testCacheSizes() { 134 Configuration conf = new Configuration(); 135 assertEquals(10000, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); 136 assertEquals(10000, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); 137 conf.setInt( 138 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, 139 10001); 140 assertEquals(10001, LeveldbTimelineStore.getStartTimeReadCacheSize(conf)); 141 conf = new Configuration(); 142 conf.setInt( 143 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, 144 10002); 145 assertEquals(10002, LeveldbTimelineStore.getStartTimeWriteCacheSize(conf)); 146 } 147 deleteNextEntity(String entityType, byte[] ts)148 private boolean deleteNextEntity(String entityType, byte[] ts) 149 throws IOException, InterruptedException { 150 LeveldbIterator iterator = null; 151 LeveldbIterator pfIterator = null; 152 try { 153 iterator = ((LeveldbTimelineStore)store).getDbIterator(false); 154 pfIterator = ((LeveldbTimelineStore)store).getDbIterator(false); 155 return ((LeveldbTimelineStore)store).deleteNextEntity(entityType, ts, 156 iterator, pfIterator, false); 157 } catch(DBException e) { 158 throw new IOException(e); 159 } finally { 160 IOUtils.cleanup(null, iterator, pfIterator); 161 } 162 } 163 164 @Test testGetEntityTypes()165 public void testGetEntityTypes() throws IOException { 166 List<String> entityTypes = ((LeveldbTimelineStore)store).getEntityTypes(); 167 assertEquals(7, entityTypes.size()); 168 assertEquals("ACL_ENTITY_TYPE_1", entityTypes.get(0)); 169 assertEquals("OLD_ENTITY_TYPE_1", entityTypes.get(1)); 170 assertEquals(entityType1, entityTypes.get(2)); 171 assertEquals(entityType2, entityTypes.get(3)); 172 assertEquals(entityType4, entityTypes.get(4)); 173 assertEquals(entityType5, entityTypes.get(5)); 174 } 175 176 @Test testDeleteEntities()177 public void testDeleteEntities() throws IOException, InterruptedException { 178 assertEquals(3, getEntities("type_1").size()); 179 assertEquals(1, getEntities("type_2").size()); 180 181 assertEquals(false, deleteNextEntity(entityType1, 182 writeReverseOrderedLong(60l))); 183 assertEquals(3, getEntities("type_1").size()); 184 assertEquals(1, getEntities("type_2").size()); 185 186 assertEquals(true, deleteNextEntity(entityType1, 187 writeReverseOrderedLong(123l))); 188 List<TimelineEntity> entities = getEntities("type_2"); 189 assertEquals(1, entities.size()); 190 verifyEntityInfo(entityId2, entityType2, events2, Collections.singletonMap( 191 entityType1, Collections.singleton(entityId1b)), EMPTY_PRIMARY_FILTERS, 192 EMPTY_MAP, entities.get(0), domainId1); 193 entities = getEntitiesWithPrimaryFilter("type_1", userFilter); 194 assertEquals(2, entities.size()); 195 verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, 196 primaryFilters, otherInfo, entities.get(0), domainId1); 197 // can retrieve entities across domains 198 verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, 199 primaryFilters, otherInfo, entities.get(1), domainId2); 200 201 ((LeveldbTimelineStore)store).discardOldEntities(-123l); 202 assertEquals(2, getEntities("type_1").size()); 203 assertEquals(0, getEntities("type_2").size()); 204 assertEquals(6, ((LeveldbTimelineStore)store).getEntityTypes().size()); 205 206 ((LeveldbTimelineStore)store).discardOldEntities(123l); 207 assertEquals(0, getEntities("type_1").size()); 208 assertEquals(0, getEntities("type_2").size()); 209 assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); 210 assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); 211 } 212 213 @Test testDeleteEntitiesPrimaryFilters()214 public void testDeleteEntitiesPrimaryFilters() 215 throws IOException, InterruptedException { 216 Map<String, Set<Object>> primaryFilter = 217 Collections.singletonMap("user", Collections.singleton( 218 (Object) "otheruser")); 219 TimelineEntities atsEntities = new TimelineEntities(); 220 atsEntities.setEntities(Collections.singletonList(createEntity(entityId1b, 221 entityType1, 789l, Collections.singletonList(ev2), null, primaryFilter, 222 null, domainId1))); 223 TimelinePutResponse response = store.put(atsEntities); 224 assertEquals(0, response.getErrors().size()); 225 226 NameValuePair pfPair = new NameValuePair("user", "otheruser"); 227 List<TimelineEntity> entities = getEntitiesWithPrimaryFilter("type_1", 228 pfPair); 229 assertEquals(1, entities.size()); 230 verifyEntityInfo(entityId1b, entityType1, Collections.singletonList(ev2), 231 EMPTY_REL_ENTITIES, primaryFilter, EMPTY_MAP, entities.get(0), 232 domainId1); 233 234 entities = getEntitiesWithPrimaryFilter("type_1", userFilter); 235 assertEquals(3, entities.size()); 236 verifyEntityInfo(entityId1, entityType1, events1, EMPTY_REL_ENTITIES, 237 primaryFilters, otherInfo, entities.get(0), domainId1); 238 verifyEntityInfo(entityId1b, entityType1, events1, EMPTY_REL_ENTITIES, 239 primaryFilters, otherInfo, entities.get(1), domainId1); 240 verifyEntityInfo(entityId6, entityType1, EMPTY_EVENTS, EMPTY_REL_ENTITIES, 241 primaryFilters, otherInfo, entities.get(2), domainId2); 242 243 ((LeveldbTimelineStore)store).discardOldEntities(-123l); 244 assertEquals(1, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); 245 assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); 246 247 ((LeveldbTimelineStore)store).discardOldEntities(123l); 248 assertEquals(0, getEntities("type_1").size()); 249 assertEquals(0, getEntities("type_2").size()); 250 assertEquals(0, ((LeveldbTimelineStore)store).getEntityTypes().size()); 251 252 assertEquals(0, getEntitiesWithPrimaryFilter("type_1", pfPair).size()); 253 assertEquals(0, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); 254 } 255 256 @Test testFromTsWithDeletion()257 public void testFromTsWithDeletion() 258 throws IOException, InterruptedException { 259 long l = System.currentTimeMillis(); 260 assertEquals(3, getEntitiesFromTs("type_1", l).size()); 261 assertEquals(1, getEntitiesFromTs("type_2", l).size()); 262 assertEquals(3, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, 263 l).size()); 264 ((LeveldbTimelineStore)store).discardOldEntities(123l); 265 assertEquals(0, getEntitiesFromTs("type_1", l).size()); 266 assertEquals(0, getEntitiesFromTs("type_2", l).size()); 267 assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, 268 l).size()); 269 assertEquals(0, getEntities("type_1").size()); 270 assertEquals(0, getEntities("type_2").size()); 271 assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, 272 l).size()); 273 loadTestEntityData(); 274 assertEquals(0, getEntitiesFromTs("type_1", l).size()); 275 assertEquals(0, getEntitiesFromTs("type_2", l).size()); 276 assertEquals(0, getEntitiesFromTsWithPrimaryFilter("type_1", userFilter, 277 l).size()); 278 assertEquals(3, getEntities("type_1").size()); 279 assertEquals(1, getEntities("type_2").size()); 280 assertEquals(3, getEntitiesWithPrimaryFilter("type_1", userFilter).size()); 281 } 282 283 @Test testCheckVersion()284 public void testCheckVersion() throws IOException { 285 LeveldbTimelineStore dbStore = (LeveldbTimelineStore) store; 286 // default version 287 Version defaultVersion = dbStore.getCurrentVersion(); 288 Assert.assertEquals(defaultVersion, dbStore.loadVersion()); 289 290 // compatible version 291 Version compatibleVersion = 292 Version.newInstance(defaultVersion.getMajorVersion(), 293 defaultVersion.getMinorVersion() + 2); 294 dbStore.storeVersion(compatibleVersion); 295 Assert.assertEquals(compatibleVersion, dbStore.loadVersion()); 296 restartTimelineStore(); 297 dbStore = (LeveldbTimelineStore) store; 298 // overwrite the compatible version 299 Assert.assertEquals(defaultVersion, dbStore.loadVersion()); 300 301 // incompatible version 302 Version incompatibleVersion = 303 Version.newInstance(defaultVersion.getMajorVersion() + 1, 304 defaultVersion.getMinorVersion()); 305 dbStore.storeVersion(incompatibleVersion); 306 try { 307 restartTimelineStore(); 308 Assert.fail("Incompatible version, should expect fail here."); 309 } catch (ServiceStateException e) { 310 Assert.assertTrue("Exception message mismatch", 311 e.getMessage().contains("Incompatible version for timeline store")); 312 } 313 } 314 315 @Test testValidateConfig()316 public void testValidateConfig() throws IOException { 317 Configuration copyConfig = new YarnConfiguration(config); 318 try { 319 Configuration newConfig = new YarnConfiguration(copyConfig); 320 newConfig.setLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 0); 321 config = newConfig; 322 restartTimelineStore(); 323 Assert.fail(); 324 } catch (IllegalArgumentException e) { 325 Assert.assertTrue(e.getMessage().contains( 326 YarnConfiguration.TIMELINE_SERVICE_TTL_MS)); 327 } 328 try { 329 Configuration newConfig = new YarnConfiguration(copyConfig); 330 newConfig.setLong( 331 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 0); 332 config = newConfig; 333 restartTimelineStore(); 334 Assert.fail(); 335 } catch (IllegalArgumentException e) { 336 Assert.assertTrue(e.getMessage().contains( 337 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS)); 338 } 339 try { 340 Configuration newConfig = new YarnConfiguration(copyConfig); 341 newConfig.setLong( 342 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, -1); 343 config = newConfig; 344 restartTimelineStore(); 345 Assert.fail(); 346 } catch (IllegalArgumentException e) { 347 Assert.assertTrue(e.getMessage().contains( 348 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); 349 } 350 try { 351 Configuration newConfig = new YarnConfiguration(copyConfig); 352 newConfig 353 .setLong( 354 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, 355 0); 356 config = newConfig; 357 restartTimelineStore(); 358 Assert.fail(); 359 } catch (IllegalArgumentException e) { 360 Assert 361 .assertTrue(e 362 .getMessage().contains( 363 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE)); 364 } 365 try { 366 Configuration newConfig = new YarnConfiguration(copyConfig); 367 newConfig 368 .setLong( 369 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, 370 0); 371 config = newConfig; 372 restartTimelineStore(); 373 Assert.fail(); 374 } catch (IllegalArgumentException e) { 375 Assert 376 .assertTrue(e 377 .getMessage() 378 .contains( 379 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE)); 380 } 381 config = copyConfig; 382 restartTimelineStore(); 383 } 384 restartTimelineStore()385 private void restartTimelineStore() throws IOException { 386 // need to close so leveldb releases database lock 387 if (store != null) { 388 store.close(); 389 } 390 store = new LeveldbTimelineStore(); 391 store.init(config); 392 store.start(); 393 } 394 395 @Test testGetDomain()396 public void testGetDomain() throws IOException { 397 super.testGetDomain(); 398 } 399 400 @Test testGetDomains()401 public void testGetDomains() throws IOException { 402 super.testGetDomains(); 403 } 404 405 @Test testRelatingToNonExistingEntity()406 public void testRelatingToNonExistingEntity() throws IOException { 407 TimelineEntity entityToStore = new TimelineEntity(); 408 entityToStore.setEntityType("TEST_ENTITY_TYPE_1"); 409 entityToStore.setEntityId("TEST_ENTITY_ID_1"); 410 entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); 411 entityToStore.addRelatedEntity("TEST_ENTITY_TYPE_2", "TEST_ENTITY_ID_2"); 412 TimelineEntities entities = new TimelineEntities(); 413 entities.addEntity(entityToStore); 414 store.put(entities); 415 TimelineEntity entityToGet = 416 store.getEntity("TEST_ENTITY_ID_2", "TEST_ENTITY_TYPE_2", null); 417 Assert.assertNotNull(entityToGet); 418 Assert.assertEquals("DEFAULT", entityToGet.getDomainId()); 419 Assert.assertEquals("TEST_ENTITY_TYPE_1", 420 entityToGet.getRelatedEntities().keySet().iterator().next()); 421 Assert.assertEquals("TEST_ENTITY_ID_1", 422 entityToGet.getRelatedEntities().values().iterator().next() 423 .iterator().next()); 424 } 425 426 @Test testRelatingToOldEntityWithoutDomainId()427 public void testRelatingToOldEntityWithoutDomainId() throws IOException { 428 // New entity is put in the default domain 429 TimelineEntity entityToStore = new TimelineEntity(); 430 entityToStore.setEntityType("NEW_ENTITY_TYPE_1"); 431 entityToStore.setEntityId("NEW_ENTITY_ID_1"); 432 entityToStore.setDomainId(TimelineDataManager.DEFAULT_DOMAIN_ID); 433 entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); 434 TimelineEntities entities = new TimelineEntities(); 435 entities.addEntity(entityToStore); 436 store.put(entities); 437 438 TimelineEntity entityToGet = 439 store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); 440 Assert.assertNotNull(entityToGet); 441 Assert.assertNull(entityToGet.getDomainId()); 442 Assert.assertEquals("NEW_ENTITY_TYPE_1", 443 entityToGet.getRelatedEntities().keySet().iterator().next()); 444 Assert.assertEquals("NEW_ENTITY_ID_1", 445 entityToGet.getRelatedEntities().values().iterator().next() 446 .iterator().next()); 447 448 // New entity is not put in the default domain 449 entityToStore = new TimelineEntity(); 450 entityToStore.setEntityType("NEW_ENTITY_TYPE_2"); 451 entityToStore.setEntityId("NEW_ENTITY_ID_2"); 452 entityToStore.setDomainId("NON_DEFAULT"); 453 entityToStore.addRelatedEntity("OLD_ENTITY_TYPE_1", "OLD_ENTITY_ID_1"); 454 entities = new TimelineEntities(); 455 entities.addEntity(entityToStore); 456 TimelinePutResponse response = store.put(entities); 457 Assert.assertEquals(1, response.getErrors().size()); 458 Assert.assertEquals(TimelinePutError.FORBIDDEN_RELATION, 459 response.getErrors().get(0).getErrorCode()); 460 entityToGet = 461 store.getEntity("OLD_ENTITY_ID_1", "OLD_ENTITY_TYPE_1", null); 462 Assert.assertNotNull(entityToGet); 463 Assert.assertNull(entityToGet.getDomainId()); 464 // Still have one related entity 465 Assert.assertEquals(1, entityToGet.getRelatedEntities().keySet().size()); 466 Assert.assertEquals(1, entityToGet.getRelatedEntities().values() 467 .iterator().next().size()); 468 } 469 470 } 471