1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.yarn.server.timeline; 20 21 import com.google.common.annotations.VisibleForTesting; 22 import com.google.common.base.Preconditions; 23 import org.apache.commons.collections.map.LRUMap; 24 import org.apache.commons.logging.Log; 25 import org.apache.commons.logging.LogFactory; 26 import org.apache.hadoop.classification.InterfaceAudience; 27 import org.apache.hadoop.classification.InterfaceAudience.Private; 28 import org.apache.hadoop.classification.InterfaceStability; 29 import org.apache.hadoop.conf.Configuration; 30 import org.apache.hadoop.fs.FileSystem; 31 import org.apache.hadoop.fs.Path; 32 import org.apache.hadoop.fs.permission.FsPermission; 33 import org.apache.hadoop.io.IOUtils; 34 import org.apache.hadoop.io.WritableComparator; 35 import org.apache.hadoop.service.AbstractService; 36 import org.apache.hadoop.yarn.api.records.timeline.*; 37 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity; 38 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError; 39 import org.apache.hadoop.yarn.conf.YarnConfiguration; 40 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto; 41 import org.apache.hadoop.yarn.server.records.Version; 42 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl; 43 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl; 44 import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder; 45 import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser; 46 import org.apache.hadoop.yarn.server.utils.LeveldbIterator; 47 import org.fusesource.leveldbjni.JniDBFactory; 48 import org.iq80.leveldb.*; 49 50 import java.io.File; 51 import java.io.IOException; 52 import java.nio.charset.Charset; 53 import java.util.*; 54 import java.util.Map.Entry; 55 import java.util.concurrent.locks.ReentrantLock; 56 import java.util.concurrent.locks.ReentrantReadWriteLock; 57 58 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong; 59 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong; 60 import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID; 61 import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches; 62 import static org.fusesource.leveldbjni.JniDBFactory.bytes; 63 64 /** 65 * <p>An implementation of an application timeline store backed by leveldb.</p> 66 * 67 * <p>There are three sections of the db, the start time section, 68 * the entity section, and the indexed entity section.</p> 69 * 70 * <p>The start time section is used to retrieve the unique start time for 71 * a given entity. Its values each contain a start time while its keys are of 72 * the form:</p> 73 * <pre> 74 * START_TIME_LOOKUP_PREFIX + entity type + entity id</pre> 75 * 76 * <p>The entity section is ordered by entity type, then entity start time 77 * descending, then entity ID. There are four sub-sections of the entity 78 * section: events, primary filters, related entities, 79 * and other info. The event entries have event info serialized into their 80 * values. The other info entries have values corresponding to the values of 81 * the other info name/value map for the entry (note the names are contained 82 * in the key). All other entries have empty values. The key structure is as 83 * follows:</p> 84 * <pre> 85 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id 86 * 87 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 88 * EVENTS_COLUMN + reveventtimestamp + eventtype 89 * 90 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 91 * PRIMARY_FILTERS_COLUMN + name + value 92 * 93 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 94 * OTHER_INFO_COLUMN + name 95 * 96 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 97 * RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id 98 * 99 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 100 * DOMAIN_ID_COLUMN 101 * 102 * ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id + 103 * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type + 104 * relatedentity id</pre> 105 * 106 * <p>The indexed entity section contains a primary filter name and primary 107 * filter value as the prefix. Within a given name/value, entire entity 108 * entries are stored in the same format as described in the entity section 109 * above (below, "key" represents any one of the possible entity entry keys 110 * described above).</p> 111 * <pre> 112 * INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value + 113 * key</pre> 114 */ 115 @InterfaceAudience.Private 116 @InterfaceStability.Unstable 117 public class LeveldbTimelineStore extends AbstractService 118 implements TimelineStore { 119 private static final Log LOG = LogFactory 120 .getLog(LeveldbTimelineStore.class); 121 122 @Private 123 @VisibleForTesting 124 static final String FILENAME = "leveldb-timeline-store.ldb"; 125 126 private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8")); 127 private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8")); 128 private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8")); 129 130 private static final byte[] EVENTS_COLUMN = "e".getBytes(Charset.forName("UTF-8")); 131 private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(Charset.forName("UTF-8")); 132 private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(Charset.forName("UTF-8")); 133 private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(Charset.forName("UTF-8")); 134 private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN = 135 "z".getBytes(Charset.forName("UTF-8")); 136 private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(Charset.forName("UTF-8")); 137 138 private static final byte[] DOMAIN_ENTRY_PREFIX = "d".getBytes(Charset.forName("UTF-8")); 139 private static final byte[] OWNER_LOOKUP_PREFIX = "o".getBytes(Charset.forName("UTF-8")); 140 private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(Charset.forName("UTF-8")); 141 private static final byte[] OWNER_COLUMN = "o".getBytes(Charset.forName("UTF-8")); 142 private static final byte[] READER_COLUMN = "r".getBytes(Charset.forName("UTF-8")); 143 private static final byte[] WRITER_COLUMN = "w".getBytes(Charset.forName("UTF-8")); 144 private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(Charset.forName("UTF-8")); 145 146 private static final byte[] EMPTY_BYTES = new byte[0]; 147 148 private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version"; 149 150 private static final Version CURRENT_VERSION_INFO = Version 151 .newInstance(1, 0); 152 153 @Private 154 @VisibleForTesting 155 static final FsPermission LEVELDB_DIR_UMASK = FsPermission 156 .createImmutable((short) 0700); 157 158 private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache; 159 private Map<EntityIdentifier, Long> startTimeReadCache; 160 161 /** 162 * Per-entity locks are obtained when writing. 163 */ 164 private final LockMap<EntityIdentifier> writeLocks = 165 new LockMap<EntityIdentifier>(); 166 167 private final ReentrantReadWriteLock deleteLock = 168 new ReentrantReadWriteLock(); 169 170 private DB db; 171 172 private Thread deletionThread; 173 LeveldbTimelineStore()174 public LeveldbTimelineStore() { 175 super(LeveldbTimelineStore.class.getName()); 176 } 177 178 @Override 179 @SuppressWarnings("unchecked") serviceInit(Configuration conf)180 protected void serviceInit(Configuration conf) throws Exception { 181 Preconditions.checkArgument(conf.getLong( 182 YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 183 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0, 184 "%s property value should be greater than zero", 185 YarnConfiguration.TIMELINE_SERVICE_TTL_MS); 186 Preconditions.checkArgument(conf.getLong( 187 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 188 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0, 189 "%s property value should be greater than zero", 190 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); 191 Preconditions.checkArgument(conf.getLong( 192 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, 193 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0, 194 "%s property value should be greater than or equal to zero", 195 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE); 196 Preconditions.checkArgument(conf.getLong( 197 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, 198 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0, 199 " %s property value should be greater than zero", 200 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); 201 Preconditions.checkArgument(conf.getLong( 202 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, 203 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0, 204 "%s property value should be greater than zero", 205 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); 206 207 Options options = new Options(); 208 options.createIfMissing(true); 209 options.cacheSize(conf.getLong( 210 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE, 211 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE)); 212 JniDBFactory factory = new JniDBFactory(); 213 Path dbPath = new Path( 214 conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME); 215 FileSystem localFS = null; 216 try { 217 localFS = FileSystem.getLocal(conf); 218 if (!localFS.exists(dbPath)) { 219 if (!localFS.mkdirs(dbPath)) { 220 throw new IOException("Couldn't create directory for leveldb " + 221 "timeline store " + dbPath); 222 } 223 localFS.setPermission(dbPath, LEVELDB_DIR_UMASK); 224 } 225 } finally { 226 IOUtils.cleanup(LOG, localFS); 227 } 228 LOG.info("Using leveldb path " + dbPath); 229 db = factory.open(new File(dbPath.toString()), options); 230 checkVersion(); 231 startTimeWriteCache = 232 Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize( 233 conf))); 234 startTimeReadCache = 235 Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize( 236 conf))); 237 238 if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) { 239 deletionThread = new EntityDeletionThread(conf); 240 deletionThread.start(); 241 } 242 243 super.serviceInit(conf); 244 } 245 246 @Override serviceStop()247 protected void serviceStop() throws Exception { 248 if (deletionThread != null) { 249 deletionThread.interrupt(); 250 LOG.info("Waiting for deletion thread to complete its current action"); 251 try { 252 deletionThread.join(); 253 } catch (InterruptedException e) { 254 LOG.warn("Interrupted while waiting for deletion thread to complete," + 255 " closing db now", e); 256 } 257 } 258 IOUtils.cleanup(LOG, db); 259 super.serviceStop(); 260 } 261 262 private static class StartAndInsertTime { 263 final long startTime; 264 final long insertTime; 265 StartAndInsertTime(long startTime, long insertTime)266 public StartAndInsertTime(long startTime, long insertTime) { 267 this.startTime = startTime; 268 this.insertTime = insertTime; 269 } 270 } 271 272 private class EntityDeletionThread extends Thread { 273 private final long ttl; 274 private final long ttlInterval; 275 EntityDeletionThread(Configuration conf)276 public EntityDeletionThread(Configuration conf) { 277 ttl = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS, 278 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS); 279 ttlInterval = conf.getLong( 280 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS, 281 YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS); 282 LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " + 283 "interval " + ttlInterval); 284 } 285 286 @Override run()287 public void run() { 288 while (true) { 289 long timestamp = System.currentTimeMillis() - ttl; 290 try { 291 discardOldEntities(timestamp); 292 Thread.sleep(ttlInterval); 293 } catch (IOException e) { 294 LOG.error(e); 295 } catch (InterruptedException e) { 296 LOG.info("Deletion thread received interrupt, exiting"); 297 break; 298 } 299 } 300 } 301 } 302 303 private static class LockMap<K> { 304 private static class CountingReentrantLock<K> extends ReentrantLock { 305 private static final long serialVersionUID = 1L; 306 private int count; 307 private K key; 308 CountingReentrantLock(K key)309 CountingReentrantLock(K key) { 310 super(); 311 this.count = 0; 312 this.key = key; 313 } 314 } 315 316 private Map<K, CountingReentrantLock<K>> locks = 317 new HashMap<K, CountingReentrantLock<K>>(); 318 getLock(K key)319 synchronized CountingReentrantLock<K> getLock(K key) { 320 CountingReentrantLock<K> lock = locks.get(key); 321 if (lock == null) { 322 lock = new CountingReentrantLock<K>(key); 323 locks.put(key, lock); 324 } 325 326 lock.count++; 327 return lock; 328 } 329 returnLock(CountingReentrantLock<K> lock)330 synchronized void returnLock(CountingReentrantLock<K> lock) { 331 if (lock.count == 0) { 332 throw new IllegalStateException("Returned lock more times than it " + 333 "was retrieved"); 334 } 335 lock.count--; 336 337 if (lock.count == 0) { 338 locks.remove(lock.key); 339 } 340 } 341 } 342 343 344 @Override getEntity(String entityId, String entityType, EnumSet<Field> fields)345 public TimelineEntity getEntity(String entityId, String entityType, 346 EnumSet<Field> fields) throws IOException { 347 Long revStartTime = getStartTimeLong(entityId, entityType); 348 if (revStartTime == null) { 349 return null; 350 } 351 byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 352 .add(entityType).add(writeReverseOrderedLong(revStartTime)) 353 .add(entityId).getBytesForLookup(); 354 355 LeveldbIterator iterator = null; 356 try { 357 iterator = new LeveldbIterator(db); 358 iterator.seek(prefix); 359 360 return getEntity(entityId, entityType, revStartTime, fields, iterator, 361 prefix, prefix.length); 362 } catch(DBException e) { 363 throw new IOException(e); 364 } finally { 365 IOUtils.cleanup(LOG, iterator); 366 } 367 } 368 369 /** 370 * Read entity from a db iterator. If no information is found in the 371 * specified fields for this entity, return null. 372 */ getEntity(String entityId, String entityType, Long startTime, EnumSet<Field> fields, LeveldbIterator iterator, byte[] prefix, int prefixlen)373 private static TimelineEntity getEntity(String entityId, String entityType, 374 Long startTime, EnumSet<Field> fields, LeveldbIterator iterator, 375 byte[] prefix, int prefixlen) throws IOException { 376 if (fields == null) { 377 fields = EnumSet.allOf(Field.class); 378 } 379 380 TimelineEntity entity = new TimelineEntity(); 381 boolean events = false; 382 boolean lastEvent = false; 383 if (fields.contains(Field.EVENTS)) { 384 events = true; 385 } else if (fields.contains(Field.LAST_EVENT_ONLY)) { 386 lastEvent = true; 387 } else { 388 entity.setEvents(null); 389 } 390 boolean relatedEntities = false; 391 if (fields.contains(Field.RELATED_ENTITIES)) { 392 relatedEntities = true; 393 } else { 394 entity.setRelatedEntities(null); 395 } 396 boolean primaryFilters = false; 397 if (fields.contains(Field.PRIMARY_FILTERS)) { 398 primaryFilters = true; 399 } else { 400 entity.setPrimaryFilters(null); 401 } 402 boolean otherInfo = false; 403 if (fields.contains(Field.OTHER_INFO)) { 404 otherInfo = true; 405 } else { 406 entity.setOtherInfo(null); 407 } 408 409 // iterate through the entity's entry, parsing information if it is part 410 // of a requested field 411 for (; iterator.hasNext(); iterator.next()) { 412 byte[] key = iterator.peekNext().getKey(); 413 if (!prefixMatches(prefix, prefixlen, key)) { 414 break; 415 } 416 if (key.length == prefixlen) { 417 continue; 418 } 419 if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { 420 if (primaryFilters) { 421 addPrimaryFilter(entity, key, 422 prefixlen + PRIMARY_FILTERS_COLUMN.length); 423 } 424 } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) { 425 if (otherInfo) { 426 entity.addOtherInfo(parseRemainingKey(key, 427 prefixlen + OTHER_INFO_COLUMN.length), 428 GenericObjectMapper.read(iterator.peekNext().getValue())); 429 } 430 } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { 431 if (relatedEntities) { 432 addRelatedEntity(entity, key, 433 prefixlen + RELATED_ENTITIES_COLUMN.length); 434 } 435 } else if (key[prefixlen] == EVENTS_COLUMN[0]) { 436 if (events || (lastEvent && 437 entity.getEvents().size() == 0)) { 438 TimelineEvent event = getEntityEvent(null, key, prefixlen + 439 EVENTS_COLUMN.length, iterator.peekNext().getValue()); 440 if (event != null) { 441 entity.addEvent(event); 442 } 443 } 444 } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) { 445 byte[] v = iterator.peekNext().getValue(); 446 String domainId = new String(v, Charset.forName("UTF-8")); 447 entity.setDomainId(domainId); 448 } else { 449 if (key[prefixlen] != 450 INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) { 451 LOG.warn(String.format("Found unexpected column for entity %s of " + 452 "type %s (0x%02x)", entityId, entityType, key[prefixlen])); 453 } 454 } 455 } 456 457 entity.setEntityId(entityId); 458 entity.setEntityType(entityType); 459 entity.setStartTime(startTime); 460 461 return entity; 462 } 463 464 @Override getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventType)465 public TimelineEvents getEntityTimelines(String entityType, 466 SortedSet<String> entityIds, Long limit, Long windowStart, 467 Long windowEnd, Set<String> eventType) throws IOException { 468 TimelineEvents events = new TimelineEvents(); 469 if (entityIds == null || entityIds.isEmpty()) { 470 return events; 471 } 472 // create a lexicographically-ordered map from start time to entities 473 Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[], 474 List<EntityIdentifier>>(new Comparator<byte[]>() { 475 @Override 476 public int compare(byte[] o1, byte[] o2) { 477 return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0, 478 o2.length); 479 } 480 }); 481 LeveldbIterator iterator = null; 482 try { 483 // look up start times for the specified entities 484 // skip entities with no start time 485 for (String entityId : entityIds) { 486 byte[] startTime = getStartTime(entityId, entityType); 487 if (startTime != null) { 488 List<EntityIdentifier> entities = startTimeMap.get(startTime); 489 if (entities == null) { 490 entities = new ArrayList<EntityIdentifier>(); 491 startTimeMap.put(startTime, entities); 492 } 493 entities.add(new EntityIdentifier(entityId, entityType)); 494 } 495 } 496 for (Entry<byte[], List<EntityIdentifier>> entry : 497 startTimeMap.entrySet()) { 498 // look up the events matching the given parameters (limit, 499 // start time, end time, event types) for entities whose start times 500 // were found and add the entities to the return list 501 byte[] revStartTime = entry.getKey(); 502 for (EntityIdentifier entityIdentifier : entry.getValue()) { 503 EventsOfOneEntity entity = new EventsOfOneEntity(); 504 entity.setEntityId(entityIdentifier.getId()); 505 entity.setEntityType(entityType); 506 events.addEvent(entity); 507 KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 508 .add(entityType).add(revStartTime).add(entityIdentifier.getId()) 509 .add(EVENTS_COLUMN); 510 byte[] prefix = kb.getBytesForLookup(); 511 if (windowEnd == null) { 512 windowEnd = Long.MAX_VALUE; 513 } 514 byte[] revts = writeReverseOrderedLong(windowEnd); 515 kb.add(revts); 516 byte[] first = kb.getBytesForLookup(); 517 byte[] last = null; 518 if (windowStart != null) { 519 last = KeyBuilder.newInstance().add(prefix) 520 .add(writeReverseOrderedLong(windowStart)).getBytesForLookup(); 521 } 522 if (limit == null) { 523 limit = DEFAULT_LIMIT; 524 } 525 iterator = new LeveldbIterator(db); 526 for (iterator.seek(first); entity.getEvents().size() < limit && 527 iterator.hasNext(); iterator.next()) { 528 byte[] key = iterator.peekNext().getKey(); 529 if (!prefixMatches(prefix, prefix.length, key) || (last != null && 530 WritableComparator.compareBytes(key, 0, key.length, last, 0, 531 last.length) > 0)) { 532 break; 533 } 534 TimelineEvent event = getEntityEvent(eventType, key, prefix.length, 535 iterator.peekNext().getValue()); 536 if (event != null) { 537 entity.addEvent(event); 538 } 539 } 540 } 541 } 542 } catch(DBException e) { 543 throw new IOException(e); 544 } finally { 545 IOUtils.cleanup(LOG, iterator); 546 } 547 return events; 548 } 549 550 @Override getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, CheckAcl checkAcl)551 public TimelineEntities getEntities(String entityType, 552 Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, 553 NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, 554 EnumSet<Field> fields, CheckAcl checkAcl) throws IOException { 555 if (primaryFilter == null) { 556 // if no primary filter is specified, prefix the lookup with 557 // ENTITY_ENTRY_PREFIX 558 return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit, 559 windowStart, windowEnd, fromId, fromTs, secondaryFilters, 560 fields, checkAcl); 561 } else { 562 // if a primary filter is specified, prefix the lookup with 563 // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue + 564 // ENTITY_ENTRY_PREFIX 565 byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) 566 .add(primaryFilter.getName()) 567 .add(GenericObjectMapper.write(primaryFilter.getValue()), true) 568 .add(ENTITY_ENTRY_PREFIX).getBytesForLookup(); 569 return getEntityByTime(base, entityType, limit, windowStart, windowEnd, 570 fromId, fromTs, secondaryFilters, fields, checkAcl); 571 } 572 } 573 574 /** 575 * Retrieves a list of entities satisfying given parameters. 576 * 577 * @param base A byte array prefix for the lookup 578 * @param entityType The type of the entity 579 * @param limit A limit on the number of entities to return 580 * @param starttime The earliest entity start time to retrieve (exclusive) 581 * @param endtime The latest entity start time to retrieve (inclusive) 582 * @param fromId Retrieve entities starting with this entity 583 * @param fromTs Ignore entities with insert timestamp later than this ts 584 * @param secondaryFilters Filter pairs that the entities should match 585 * @param fields The set of fields to retrieve 586 * @return A list of entities 587 * @throws IOException 588 */ getEntityByTime(byte[] base, String entityType, Long limit, Long starttime, Long endtime, String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, CheckAcl checkAcl)589 private TimelineEntities getEntityByTime(byte[] base, 590 String entityType, Long limit, Long starttime, Long endtime, 591 String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters, 592 EnumSet<Field> fields, CheckAcl checkAcl) throws IOException { 593 LeveldbIterator iterator = null; 594 try { 595 KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType); 596 // only db keys matching the prefix (base + entity type) will be parsed 597 byte[] prefix = kb.getBytesForLookup(); 598 if (endtime == null) { 599 // if end time is null, place no restriction on end time 600 endtime = Long.MAX_VALUE; 601 } 602 // construct a first key that will be seeked to using end time or fromId 603 byte[] first = null; 604 if (fromId != null) { 605 Long fromIdStartTime = getStartTimeLong(fromId, entityType); 606 if (fromIdStartTime == null) { 607 // no start time for provided id, so return empty entities 608 return new TimelineEntities(); 609 } 610 if (fromIdStartTime <= endtime) { 611 // if provided id's start time falls before the end of the window, 612 // use it to construct the seek key 613 first = kb.add(writeReverseOrderedLong(fromIdStartTime)) 614 .add(fromId).getBytesForLookup(); 615 } 616 } 617 // if seek key wasn't constructed using fromId, construct it using end ts 618 if (first == null) { 619 first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup(); 620 } 621 byte[] last = null; 622 if (starttime != null) { 623 // if start time is not null, set a last key that will not be 624 // iterated past 625 last = KeyBuilder.newInstance().add(base).add(entityType) 626 .add(writeReverseOrderedLong(starttime)).getBytesForLookup(); 627 } 628 if (limit == null) { 629 // if limit is not specified, use the default 630 limit = DEFAULT_LIMIT; 631 } 632 633 TimelineEntities entities = new TimelineEntities(); 634 iterator = new LeveldbIterator(db); 635 iterator.seek(first); 636 // iterate until one of the following conditions is met: limit is 637 // reached, there are no more keys, the key prefix no longer matches, 638 // or a start time has been specified and reached/exceeded 639 while (entities.getEntities().size() < limit && iterator.hasNext()) { 640 byte[] key = iterator.peekNext().getKey(); 641 if (!prefixMatches(prefix, prefix.length, key) || (last != null && 642 WritableComparator.compareBytes(key, 0, key.length, last, 0, 643 last.length) > 0)) { 644 break; 645 } 646 // read the start time and entity id from the current key 647 KeyParser kp = new KeyParser(key, prefix.length); 648 Long startTime = kp.getNextLong(); 649 String entityId = kp.getNextString(); 650 651 if (fromTs != null) { 652 long insertTime = readReverseOrderedLong(iterator.peekNext() 653 .getValue(), 0); 654 if (insertTime > fromTs) { 655 byte[] firstKey = key; 656 while (iterator.hasNext() && prefixMatches(firstKey, 657 kp.getOffset(), key)) { 658 iterator.next(); 659 key = iterator.peekNext().getKey(); 660 } 661 continue; 662 } 663 } 664 665 // parse the entity that owns this key, iterating over all keys for 666 // the entity 667 TimelineEntity entity = getEntity(entityId, entityType, startTime, 668 fields, iterator, key, kp.getOffset()); 669 // determine if the retrieved entity matches the provided secondary 670 // filters, and if so add it to the list of entities to return 671 boolean filterPassed = true; 672 if (secondaryFilters != null) { 673 for (NameValuePair filter : secondaryFilters) { 674 Object v = entity.getOtherInfo().get(filter.getName()); 675 if (v == null) { 676 Set<Object> vs = entity.getPrimaryFilters() 677 .get(filter.getName()); 678 if (vs == null || !vs.contains(filter.getValue())) { 679 filterPassed = false; 680 break; 681 } 682 } else if (!v.equals(filter.getValue())) { 683 filterPassed = false; 684 break; 685 } 686 } 687 } 688 if (filterPassed) { 689 if (entity.getDomainId() == null) { 690 entity.setDomainId(DEFAULT_DOMAIN_ID); 691 } 692 if (checkAcl == null || checkAcl.check(entity)) { 693 entities.addEntity(entity); 694 } 695 } 696 } 697 return entities; 698 } catch(DBException e) { 699 throw new IOException(e); 700 } finally { 701 IOUtils.cleanup(LOG, iterator); 702 } 703 } 704 705 /** 706 * Handle error and set it in response. 707 */ handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode)708 private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) { 709 TimelinePutError error = new TimelinePutError(); 710 error.setEntityId(entity.getEntityId()); 711 error.setEntityType(entity.getEntityType()); 712 error.setErrorCode(errorCode); 713 response.addError(error); 714 } 715 716 /** 717 * Put a single entity. If there is an error, add a TimelinePutError to the 718 * given response. 719 */ put(TimelineEntity entity, TimelinePutResponse response, boolean allowEmptyDomainId)720 private void put(TimelineEntity entity, TimelinePutResponse response, 721 boolean allowEmptyDomainId) { 722 LockMap.CountingReentrantLock<EntityIdentifier> lock = 723 writeLocks.getLock(new EntityIdentifier(entity.getEntityId(), 724 entity.getEntityType())); 725 lock.lock(); 726 WriteBatch writeBatch = null; 727 List<EntityIdentifier> relatedEntitiesWithoutStartTimes = 728 new ArrayList<EntityIdentifier>(); 729 byte[] revStartTime = null; 730 Map<String, Set<Object>> primaryFilters = null; 731 try { 732 writeBatch = db.createWriteBatch(); 733 List<TimelineEvent> events = entity.getEvents(); 734 // look up the start time for the entity 735 StartAndInsertTime startAndInsertTime = getAndSetStartTime( 736 entity.getEntityId(), entity.getEntityType(), 737 entity.getStartTime(), events); 738 if (startAndInsertTime == null) { 739 // if no start time is found, add an error and return 740 handleError(entity, response, TimelinePutError.NO_START_TIME); 741 return; 742 } 743 revStartTime = writeReverseOrderedLong(startAndInsertTime 744 .startTime); 745 746 primaryFilters = entity.getPrimaryFilters(); 747 748 // write entity marker 749 byte[] markerKey = createEntityMarkerKey(entity.getEntityId(), 750 entity.getEntityType(), revStartTime); 751 byte[] markerValue = writeReverseOrderedLong(startAndInsertTime 752 .insertTime); 753 writeBatch.put(markerKey, markerValue); 754 writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey, 755 markerValue); 756 757 // write event entries 758 if (events != null && !events.isEmpty()) { 759 for (TimelineEvent event : events) { 760 byte[] revts = writeReverseOrderedLong(event.getTimestamp()); 761 byte[] key = createEntityEventKey(entity.getEntityId(), 762 entity.getEntityType(), revStartTime, revts, 763 event.getEventType()); 764 byte[] value = GenericObjectMapper.write(event.getEventInfo()); 765 writeBatch.put(key, value); 766 writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); 767 } 768 } 769 770 // write related entity entries 771 Map<String, Set<String>> relatedEntities = 772 entity.getRelatedEntities(); 773 if (relatedEntities != null && !relatedEntities.isEmpty()) { 774 for (Entry<String, Set<String>> relatedEntityList : 775 relatedEntities.entrySet()) { 776 String relatedEntityType = relatedEntityList.getKey(); 777 for (String relatedEntityId : relatedEntityList.getValue()) { 778 // invisible "reverse" entries (entity -> related entity) 779 byte[] key = createReverseRelatedEntityKey(entity.getEntityId(), 780 entity.getEntityType(), revStartTime, relatedEntityId, 781 relatedEntityType); 782 writeBatch.put(key, EMPTY_BYTES); 783 // look up start time of related entity 784 byte[] relatedEntityStartTime = getStartTime(relatedEntityId, 785 relatedEntityType); 786 // delay writing the related entity if no start time is found 787 if (relatedEntityStartTime == null) { 788 relatedEntitiesWithoutStartTimes.add( 789 new EntityIdentifier(relatedEntityId, relatedEntityType)); 790 continue; 791 } else { 792 // This is the existing entity 793 byte[] domainIdBytes = db.get(createDomainIdKey( 794 relatedEntityId, relatedEntityType, relatedEntityStartTime)); 795 // The timeline data created by the server before 2.6 won't have 796 // the domain field. We assume this timeline data is in the 797 // default timeline domain. 798 String domainId = null; 799 if (domainIdBytes == null) { 800 domainId = TimelineDataManager.DEFAULT_DOMAIN_ID; 801 } else { 802 domainId = new String(domainIdBytes, Charset.forName("UTF-8")); 803 } 804 if (!domainId.equals(entity.getDomainId())) { 805 // in this case the entity will be put, but the relation will be 806 // ignored 807 handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION); 808 continue; 809 } 810 } 811 // write "forward" entry (related entity -> entity) 812 key = createRelatedEntityKey(relatedEntityId, 813 relatedEntityType, relatedEntityStartTime, 814 entity.getEntityId(), entity.getEntityType()); 815 writeBatch.put(key, EMPTY_BYTES); 816 } 817 } 818 } 819 820 // write primary filter entries 821 if (primaryFilters != null && !primaryFilters.isEmpty()) { 822 for (Entry<String, Set<Object>> primaryFilter : 823 primaryFilters.entrySet()) { 824 for (Object primaryFilterValue : primaryFilter.getValue()) { 825 byte[] key = createPrimaryFilterKey(entity.getEntityId(), 826 entity.getEntityType(), revStartTime, 827 primaryFilter.getKey(), primaryFilterValue); 828 writeBatch.put(key, EMPTY_BYTES); 829 writePrimaryFilterEntries(writeBatch, primaryFilters, key, 830 EMPTY_BYTES); 831 } 832 } 833 } 834 835 // write other info entries 836 Map<String, Object> otherInfo = entity.getOtherInfo(); 837 if (otherInfo != null && !otherInfo.isEmpty()) { 838 for (Entry<String, Object> i : otherInfo.entrySet()) { 839 byte[] key = createOtherInfoKey(entity.getEntityId(), 840 entity.getEntityType(), revStartTime, i.getKey()); 841 byte[] value = GenericObjectMapper.write(i.getValue()); 842 writeBatch.put(key, value); 843 writePrimaryFilterEntries(writeBatch, primaryFilters, key, value); 844 } 845 } 846 847 // write domain id entry 848 byte[] key = createDomainIdKey(entity.getEntityId(), 849 entity.getEntityType(), revStartTime); 850 if (entity.getDomainId() == null || 851 entity.getDomainId().length() == 0) { 852 if (!allowEmptyDomainId) { 853 handleError(entity, response, TimelinePutError.NO_DOMAIN); 854 return; 855 } 856 } else { 857 writeBatch.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8"))); 858 writePrimaryFilterEntries(writeBatch, primaryFilters, key, 859 entity.getDomainId().getBytes(Charset.forName("UTF-8"))); 860 } 861 db.write(writeBatch); 862 } catch (DBException de) { 863 LOG.error("Error putting entity " + entity.getEntityId() + 864 " of type " + entity.getEntityType(), de); 865 handleError(entity, response, TimelinePutError.IO_EXCEPTION); 866 } catch (IOException e) { 867 LOG.error("Error putting entity " + entity.getEntityId() + 868 " of type " + entity.getEntityType(), e); 869 handleError(entity, response, TimelinePutError.IO_EXCEPTION); 870 } finally { 871 lock.unlock(); 872 writeLocks.returnLock(lock); 873 IOUtils.cleanup(LOG, writeBatch); 874 } 875 876 for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) { 877 lock = writeLocks.getLock(relatedEntity); 878 lock.lock(); 879 try { 880 StartAndInsertTime relatedEntityStartAndInsertTime = 881 getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(), 882 readReverseOrderedLong(revStartTime, 0), null); 883 if (relatedEntityStartAndInsertTime == null) { 884 throw new IOException("Error setting start time for related entity"); 885 } 886 byte[] relatedEntityStartTime = writeReverseOrderedLong( 887 relatedEntityStartAndInsertTime.startTime); 888 // This is the new entity, the domain should be the same 889 byte[] key = createDomainIdKey(relatedEntity.getId(), 890 relatedEntity.getType(), relatedEntityStartTime); 891 db.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8"))); 892 db.put(createRelatedEntityKey(relatedEntity.getId(), 893 relatedEntity.getType(), relatedEntityStartTime, 894 entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES); 895 db.put(createEntityMarkerKey(relatedEntity.getId(), 896 relatedEntity.getType(), relatedEntityStartTime), 897 writeReverseOrderedLong(relatedEntityStartAndInsertTime 898 .insertTime)); 899 } catch (DBException de) { 900 LOG.error("Error putting related entity " + relatedEntity.getId() + 901 " of type " + relatedEntity.getType() + " for entity " + 902 entity.getEntityId() + " of type " + entity.getEntityType(), de); 903 handleError(entity, response, TimelinePutError.IO_EXCEPTION); 904 } catch (IOException e) { 905 LOG.error("Error putting related entity " + relatedEntity.getId() + 906 " of type " + relatedEntity.getType() + " for entity " + 907 entity.getEntityId() + " of type " + entity.getEntityType(), e); 908 handleError(entity, response, TimelinePutError.IO_EXCEPTION); 909 } finally { 910 lock.unlock(); 911 writeLocks.returnLock(lock); 912 } 913 } 914 } 915 916 /** 917 * For a given key / value pair that has been written to the db, 918 * write additional entries to the db for each primary filter. 919 */ writePrimaryFilterEntries(WriteBatch writeBatch, Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)920 private static void writePrimaryFilterEntries(WriteBatch writeBatch, 921 Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value) 922 throws IOException { 923 if (primaryFilters != null && !primaryFilters.isEmpty()) { 924 for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) { 925 for (Object pfval : pf.getValue()) { 926 writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval, 927 key), value); 928 } 929 } 930 } 931 } 932 933 @Override put(TimelineEntities entities)934 public TimelinePutResponse put(TimelineEntities entities) { 935 try { 936 deleteLock.readLock().lock(); 937 TimelinePutResponse response = new TimelinePutResponse(); 938 for (TimelineEntity entity : entities.getEntities()) { 939 put(entity, response, false); 940 } 941 return response; 942 } finally { 943 deleteLock.readLock().unlock(); 944 } 945 } 946 947 @Private 948 @VisibleForTesting putWithNoDomainId(TimelineEntities entities)949 public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) { 950 try { 951 deleteLock.readLock().lock(); 952 TimelinePutResponse response = new TimelinePutResponse(); 953 for (TimelineEntity entity : entities.getEntities()) { 954 put(entity, response, true); 955 } 956 return response; 957 } finally { 958 deleteLock.readLock().unlock(); 959 } 960 } 961 962 /** 963 * Get the unique start time for a given entity as a byte array that sorts 964 * the timestamps in reverse order (see {@link 965 * GenericObjectMapper#writeReverseOrderedLong(long)}). 966 * 967 * @param entityId The id of the entity 968 * @param entityType The type of the entity 969 * @return A byte array, null if not found 970 * @throws IOException 971 */ getStartTime(String entityId, String entityType)972 private byte[] getStartTime(String entityId, String entityType) 973 throws IOException { 974 Long l = getStartTimeLong(entityId, entityType); 975 return l == null ? null : writeReverseOrderedLong(l); 976 } 977 978 /** 979 * Get the unique start time for a given entity as a Long. 980 * 981 * @param entityId The id of the entity 982 * @param entityType The type of the entity 983 * @return A Long, null if not found 984 * @throws IOException 985 */ getStartTimeLong(String entityId, String entityType)986 private Long getStartTimeLong(String entityId, String entityType) 987 throws IOException { 988 EntityIdentifier entity = new EntityIdentifier(entityId, entityType); 989 try { 990 // start time is not provided, so try to look it up 991 if (startTimeReadCache.containsKey(entity)) { 992 // found the start time in the cache 993 return startTimeReadCache.get(entity); 994 } else { 995 // try to look up the start time in the db 996 byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); 997 byte[] v = db.get(b); 998 if (v == null) { 999 // did not find the start time in the db 1000 return null; 1001 } else { 1002 // found the start time in the db 1003 Long l = readReverseOrderedLong(v, 0); 1004 startTimeReadCache.put(entity, l); 1005 return l; 1006 } 1007 } 1008 } catch(DBException e) { 1009 throw new IOException(e); 1010 } 1011 } 1012 1013 /** 1014 * Get the unique start time for a given entity as a byte array that sorts 1015 * the timestamps in reverse order (see {@link 1016 * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time 1017 * doesn't exist, set it based on the information provided. Should only be 1018 * called when a lock has been obtained on the entity. 1019 * 1020 * @param entityId The id of the entity 1021 * @param entityType The type of the entity 1022 * @param startTime The start time of the entity, or null 1023 * @param events A list of events for the entity, or null 1024 * @return A StartAndInsertTime 1025 * @throws IOException 1026 */ getAndSetStartTime(String entityId, String entityType, Long startTime, List<TimelineEvent> events)1027 private StartAndInsertTime getAndSetStartTime(String entityId, 1028 String entityType, Long startTime, List<TimelineEvent> events) 1029 throws IOException { 1030 EntityIdentifier entity = new EntityIdentifier(entityId, entityType); 1031 if (startTime == null) { 1032 // start time is not provided, so try to look it up 1033 if (startTimeWriteCache.containsKey(entity)) { 1034 // found the start time in the cache 1035 return startTimeWriteCache.get(entity); 1036 } else { 1037 if (events != null) { 1038 // prepare a start time from events in case it is needed 1039 Long min = Long.MAX_VALUE; 1040 for (TimelineEvent e : events) { 1041 if (min > e.getTimestamp()) { 1042 min = e.getTimestamp(); 1043 } 1044 } 1045 startTime = min; 1046 } 1047 return checkStartTimeInDb(entity, startTime); 1048 } 1049 } else { 1050 // start time is provided 1051 if (startTimeWriteCache.containsKey(entity)) { 1052 // always use start time from cache if it exists 1053 return startTimeWriteCache.get(entity); 1054 } else { 1055 // check the provided start time matches the db 1056 return checkStartTimeInDb(entity, startTime); 1057 } 1058 } 1059 } 1060 1061 /** 1062 * Checks db for start time and returns it if it exists. If it doesn't 1063 * exist, writes the suggested start time (if it is not null). This is 1064 * only called when the start time is not found in the cache, 1065 * so it adds it back into the cache if it is found. Should only be called 1066 * when a lock has been obtained on the entity. 1067 */ checkStartTimeInDb(EntityIdentifier entity, Long suggestedStartTime)1068 private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity, 1069 Long suggestedStartTime) throws IOException { 1070 StartAndInsertTime startAndInsertTime = null; 1071 // create lookup key for start time 1072 byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType()); 1073 try { 1074 // retrieve value for key 1075 byte[] v = db.get(b); 1076 if (v == null) { 1077 // start time doesn't exist in db 1078 if (suggestedStartTime == null) { 1079 return null; 1080 } 1081 startAndInsertTime = new StartAndInsertTime(suggestedStartTime, 1082 System.currentTimeMillis()); 1083 1084 // write suggested start time 1085 v = new byte[16]; 1086 writeReverseOrderedLong(suggestedStartTime, v, 0); 1087 writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8); 1088 WriteOptions writeOptions = new WriteOptions(); 1089 writeOptions.sync(true); 1090 db.put(b, v, writeOptions); 1091 } else { 1092 // found start time in db, so ignore suggested start time 1093 startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0), 1094 readReverseOrderedLong(v, 8)); 1095 } 1096 } catch(DBException e) { 1097 throw new IOException(e); 1098 } 1099 startTimeWriteCache.put(entity, startAndInsertTime); 1100 startTimeReadCache.put(entity, startAndInsertTime.startTime); 1101 return startAndInsertTime; 1102 } 1103 1104 /** 1105 * Creates a key for looking up the start time of a given entity, 1106 * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id. 1107 */ createStartTimeLookupKey(String entityId, String entityType)1108 private static byte[] createStartTimeLookupKey(String entityId, 1109 String entityType) throws IOException { 1110 return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX) 1111 .add(entityType).add(entityId).getBytes(); 1112 } 1113 1114 /** 1115 * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type + 1116 * revstarttime + entity id. 1117 */ createEntityMarkerKey(String entityId, String entityType, byte[] revStartTime)1118 private static byte[] createEntityMarkerKey(String entityId, 1119 String entityType, byte[] revStartTime) throws IOException { 1120 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 1121 .add(entityType).add(revStartTime).add(entityId).getBytesForLookup(); 1122 } 1123 1124 /** 1125 * Creates an index entry for the given key of the form 1126 * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key. 1127 */ addPrimaryFilterToKey(String primaryFilterName, Object primaryFilterValue, byte[] key)1128 private static byte[] addPrimaryFilterToKey(String primaryFilterName, 1129 Object primaryFilterValue, byte[] key) throws IOException { 1130 return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX) 1131 .add(primaryFilterName) 1132 .add(GenericObjectMapper.write(primaryFilterValue), true).add(key) 1133 .getBytes(); 1134 } 1135 1136 /** 1137 * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type + 1138 * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type. 1139 */ createEntityEventKey(String entityId, String entityType, byte[] revStartTime, byte[] revEventTimestamp, String eventType)1140 private static byte[] createEntityEventKey(String entityId, 1141 String entityType, byte[] revStartTime, byte[] revEventTimestamp, 1142 String eventType) throws IOException { 1143 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 1144 .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN) 1145 .add(revEventTimestamp).add(eventType).getBytes(); 1146 } 1147 1148 /** 1149 * Creates an event object from the given key, offset, and value. If the 1150 * event type is not contained in the specified set of event types, 1151 * returns null. 1152 */ getEntityEvent(Set<String> eventTypes, byte[] key, int offset, byte[] value)1153 private static TimelineEvent getEntityEvent(Set<String> eventTypes, 1154 byte[] key, int offset, byte[] value) throws IOException { 1155 KeyParser kp = new KeyParser(key, offset); 1156 long ts = kp.getNextLong(); 1157 String tstype = kp.getNextString(); 1158 if (eventTypes == null || eventTypes.contains(tstype)) { 1159 TimelineEvent event = new TimelineEvent(); 1160 event.setTimestamp(ts); 1161 event.setEventType(tstype); 1162 Object o = GenericObjectMapper.read(value); 1163 if (o == null) { 1164 event.setEventInfo(null); 1165 } else if (o instanceof Map) { 1166 @SuppressWarnings("unchecked") 1167 Map<String, Object> m = (Map<String, Object>) o; 1168 event.setEventInfo(m); 1169 } else { 1170 throw new IOException("Couldn't deserialize event info map"); 1171 } 1172 return event; 1173 } 1174 return null; 1175 } 1176 1177 /** 1178 * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX + 1179 * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name + 1180 * value. 1181 */ createPrimaryFilterKey(String entityId, String entityType, byte[] revStartTime, String name, Object value)1182 private static byte[] createPrimaryFilterKey(String entityId, 1183 String entityType, byte[] revStartTime, String name, Object value) 1184 throws IOException { 1185 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) 1186 .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name) 1187 .add(GenericObjectMapper.write(value)).getBytes(); 1188 } 1189 1190 /** 1191 * Parses the primary filter from the given key at the given offset and 1192 * adds it to the given entity. 1193 */ addPrimaryFilter(TimelineEntity entity, byte[] key, int offset)1194 private static void addPrimaryFilter(TimelineEntity entity, byte[] key, 1195 int offset) throws IOException { 1196 KeyParser kp = new KeyParser(key, offset); 1197 String name = kp.getNextString(); 1198 Object value = GenericObjectMapper.read(key, kp.getOffset()); 1199 entity.addPrimaryFilter(name, value); 1200 } 1201 1202 /** 1203 * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type + 1204 * revstarttime + entity id + OTHER_INFO_COLUMN + name. 1205 */ createOtherInfoKey(String entityId, String entityType, byte[] revStartTime, String name)1206 private static byte[] createOtherInfoKey(String entityId, String entityType, 1207 byte[] revStartTime, String name) throws IOException { 1208 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) 1209 .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name) 1210 .getBytes(); 1211 } 1212 1213 /** 1214 * Creates a string representation of the byte array from the given offset 1215 * to the end of the array (for parsing other info keys). 1216 */ parseRemainingKey(byte[] b, int offset)1217 private static String parseRemainingKey(byte[] b, int offset) { 1218 return new String(b, offset, b.length - offset, Charset.forName("UTF-8")); 1219 } 1220 1221 /** 1222 * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX + 1223 * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN + 1224 * relatedentity type + relatedentity id. 1225 */ createRelatedEntityKey(String entityId, String entityType, byte[] revStartTime, String relatedEntityId, String relatedEntityType)1226 private static byte[] createRelatedEntityKey(String entityId, 1227 String entityType, byte[] revStartTime, String relatedEntityId, 1228 String relatedEntityType) throws IOException { 1229 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) 1230 .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN) 1231 .add(relatedEntityType).add(relatedEntityId).getBytes(); 1232 } 1233 1234 /** 1235 * Parses the related entity from the given key at the given offset and 1236 * adds it to the given entity. 1237 */ addRelatedEntity(TimelineEntity entity, byte[] key, int offset)1238 private static void addRelatedEntity(TimelineEntity entity, byte[] key, 1239 int offset) throws IOException { 1240 KeyParser kp = new KeyParser(key, offset); 1241 String type = kp.getNextString(); 1242 String id = kp.getNextString(); 1243 entity.addRelatedEntity(type, id); 1244 } 1245 1246 /** 1247 * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX + 1248 * entity type + revstarttime + entity id + 1249 * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + 1250 * relatedentity type + relatedentity id. 1251 */ createReverseRelatedEntityKey(String entityId, String entityType, byte[] revStartTime, String relatedEntityId, String relatedEntityType)1252 private static byte[] createReverseRelatedEntityKey(String entityId, 1253 String entityType, byte[] revStartTime, String relatedEntityId, 1254 String relatedEntityType) throws IOException { 1255 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) 1256 .add(revStartTime).add(entityId) 1257 .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN) 1258 .add(relatedEntityType).add(relatedEntityId).getBytes(); 1259 } 1260 1261 /** 1262 * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX + 1263 * entity type + revstarttime + entity id + DOMAIN_ID_COLUMN. 1264 */ createDomainIdKey(String entityId, String entityType, byte[] revStartTime)1265 private static byte[] createDomainIdKey(String entityId, 1266 String entityType, byte[] revStartTime) throws IOException { 1267 return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType) 1268 .add(revStartTime).add(entityId).add(DOMAIN_ID_COLUMN).getBytes(); 1269 } 1270 /** 1271 * Clears the cache to test reloading start times from leveldb (only for 1272 * testing). 1273 */ 1274 @VisibleForTesting clearStartTimeCache()1275 void clearStartTimeCache() { 1276 startTimeWriteCache.clear(); 1277 startTimeReadCache.clear(); 1278 } 1279 1280 @VisibleForTesting getStartTimeReadCacheSize(Configuration conf)1281 static int getStartTimeReadCacheSize(Configuration conf) { 1282 return conf.getInt( 1283 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE, 1284 YarnConfiguration. 1285 DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE); 1286 } 1287 1288 @VisibleForTesting getStartTimeWriteCacheSize(Configuration conf)1289 static int getStartTimeWriteCacheSize(Configuration conf) { 1290 return conf.getInt( 1291 YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE, 1292 YarnConfiguration. 1293 DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE); 1294 } 1295 1296 @VisibleForTesting getEntityTypes()1297 List<String> getEntityTypes() throws IOException { 1298 LeveldbIterator iterator = null; 1299 try { 1300 iterator = getDbIterator(false); 1301 List<String> entityTypes = new ArrayList<String>(); 1302 iterator.seek(ENTITY_ENTRY_PREFIX); 1303 while (iterator.hasNext()) { 1304 byte[] key = iterator.peekNext().getKey(); 1305 if (key[0] != ENTITY_ENTRY_PREFIX[0]) { 1306 break; 1307 } 1308 KeyParser kp = new KeyParser(key, 1309 ENTITY_ENTRY_PREFIX.length); 1310 String entityType = kp.getNextString(); 1311 entityTypes.add(entityType); 1312 byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 1313 .add(entityType).getBytesForLookup(); 1314 if (lookupKey[lookupKey.length - 1] != 0x0) { 1315 throw new IOException("Found unexpected end byte in lookup key"); 1316 } 1317 lookupKey[lookupKey.length - 1] = 0x1; 1318 iterator.seek(lookupKey); 1319 } 1320 return entityTypes; 1321 } catch(DBException e) { 1322 throw new IOException(e); 1323 } finally { 1324 IOUtils.cleanup(LOG, iterator); 1325 } 1326 } 1327 1328 /** 1329 * Finds all keys in the db that have a given prefix and deletes them on 1330 * the given write batch. 1331 */ deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, LeveldbIterator iterator)1332 private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, 1333 LeveldbIterator iterator) { 1334 for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) { 1335 byte[] key = iterator.peekNext().getKey(); 1336 if (!prefixMatches(prefix, prefix.length, key)) { 1337 break; 1338 } 1339 writeBatch.delete(key); 1340 } 1341 } 1342 1343 @VisibleForTesting deleteNextEntity(String entityType, byte[] reverseTimestamp, LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)1344 boolean deleteNextEntity(String entityType, byte[] reverseTimestamp, 1345 LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked) 1346 throws IOException { 1347 WriteBatch writeBatch = null; 1348 try { 1349 KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX) 1350 .add(entityType); 1351 byte[] typePrefix = kb.getBytesForLookup(); 1352 kb.add(reverseTimestamp); 1353 if (!seeked) { 1354 iterator.seek(kb.getBytesForLookup()); 1355 } 1356 if (!iterator.hasNext()) { 1357 return false; 1358 } 1359 byte[] entityKey = iterator.peekNext().getKey(); 1360 if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) { 1361 return false; 1362 } 1363 1364 // read the start time and entity id from the current key 1365 KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8); 1366 String entityId = kp.getNextString(); 1367 int prefixlen = kp.getOffset(); 1368 byte[] deletePrefix = new byte[prefixlen]; 1369 System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen); 1370 1371 writeBatch = db.createWriteBatch(); 1372 1373 if (LOG.isDebugEnabled()) { 1374 LOG.debug("Deleting entity type:" + entityType + " id:" + entityId); 1375 } 1376 // remove start time from cache and db 1377 writeBatch.delete(createStartTimeLookupKey(entityId, entityType)); 1378 EntityIdentifier entityIdentifier = 1379 new EntityIdentifier(entityId, entityType); 1380 startTimeReadCache.remove(entityIdentifier); 1381 startTimeWriteCache.remove(entityIdentifier); 1382 1383 // delete current entity 1384 for (; iterator.hasNext(); iterator.next()) { 1385 byte[] key = iterator.peekNext().getKey(); 1386 if (!prefixMatches(entityKey, prefixlen, key)) { 1387 break; 1388 } 1389 writeBatch.delete(key); 1390 1391 if (key.length == prefixlen) { 1392 continue; 1393 } 1394 if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) { 1395 kp = new KeyParser(key, 1396 prefixlen + PRIMARY_FILTERS_COLUMN.length); 1397 String name = kp.getNextString(); 1398 Object value = GenericObjectMapper.read(key, kp.getOffset()); 1399 deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value, 1400 deletePrefix), pfIterator); 1401 if (LOG.isDebugEnabled()) { 1402 LOG.debug("Deleting entity type:" + entityType + " id:" + 1403 entityId + " primary filter entry " + name + " " + 1404 value); 1405 } 1406 } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) { 1407 kp = new KeyParser(key, 1408 prefixlen + RELATED_ENTITIES_COLUMN.length); 1409 String type = kp.getNextString(); 1410 String id = kp.getNextString(); 1411 byte[] relatedEntityStartTime = getStartTime(id, type); 1412 if (relatedEntityStartTime == null) { 1413 LOG.warn("Found no start time for " + 1414 "related entity " + id + " of type " + type + " while " + 1415 "deleting " + entityId + " of type " + entityType); 1416 continue; 1417 } 1418 writeBatch.delete(createReverseRelatedEntityKey(id, type, 1419 relatedEntityStartTime, entityId, entityType)); 1420 if (LOG.isDebugEnabled()) { 1421 LOG.debug("Deleting entity type:" + entityType + " id:" + 1422 entityId + " from invisible reverse related entity " + 1423 "entry of type:" + type + " id:" + id); 1424 } 1425 } else if (key[prefixlen] == 1426 INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) { 1427 kp = new KeyParser(key, prefixlen + 1428 INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length); 1429 String type = kp.getNextString(); 1430 String id = kp.getNextString(); 1431 byte[] relatedEntityStartTime = getStartTime(id, type); 1432 if (relatedEntityStartTime == null) { 1433 LOG.warn("Found no start time for reverse " + 1434 "related entity " + id + " of type " + type + " while " + 1435 "deleting " + entityId + " of type " + entityType); 1436 continue; 1437 } 1438 writeBatch.delete(createRelatedEntityKey(id, type, 1439 relatedEntityStartTime, entityId, entityType)); 1440 if (LOG.isDebugEnabled()) { 1441 LOG.debug("Deleting entity type:" + entityType + " id:" + 1442 entityId + " from related entity entry of type:" + 1443 type + " id:" + id); 1444 } 1445 } 1446 } 1447 WriteOptions writeOptions = new WriteOptions(); 1448 writeOptions.sync(true); 1449 db.write(writeBatch, writeOptions); 1450 return true; 1451 } catch(DBException e) { 1452 throw new IOException(e); 1453 } finally { 1454 IOUtils.cleanup(LOG, writeBatch); 1455 } 1456 } 1457 1458 /** 1459 * Discards entities with start timestamp less than or equal to the given 1460 * timestamp. 1461 */ 1462 @VisibleForTesting discardOldEntities(long timestamp)1463 void discardOldEntities(long timestamp) 1464 throws IOException, InterruptedException { 1465 byte[] reverseTimestamp = writeReverseOrderedLong(timestamp); 1466 long totalCount = 0; 1467 long t1 = System.currentTimeMillis(); 1468 try { 1469 List<String> entityTypes = getEntityTypes(); 1470 for (String entityType : entityTypes) { 1471 LeveldbIterator iterator = null; 1472 LeveldbIterator pfIterator = null; 1473 long typeCount = 0; 1474 try { 1475 deleteLock.writeLock().lock(); 1476 iterator = getDbIterator(false); 1477 pfIterator = getDbIterator(false); 1478 1479 if (deletionThread != null && deletionThread.isInterrupted()) { 1480 throw new InterruptedException(); 1481 } 1482 boolean seeked = false; 1483 while (deleteNextEntity(entityType, reverseTimestamp, iterator, 1484 pfIterator, seeked)) { 1485 typeCount++; 1486 totalCount++; 1487 seeked = true; 1488 if (deletionThread != null && deletionThread.isInterrupted()) { 1489 throw new InterruptedException(); 1490 } 1491 } 1492 } catch (IOException e) { 1493 LOG.error("Got IOException while deleting entities for type " + 1494 entityType + ", continuing to next type", e); 1495 } finally { 1496 IOUtils.cleanup(LOG, iterator, pfIterator); 1497 deleteLock.writeLock().unlock(); 1498 if (typeCount > 0) { 1499 LOG.info("Deleted " + typeCount + " entities of type " + 1500 entityType); 1501 } 1502 } 1503 } 1504 } finally { 1505 long t2 = System.currentTimeMillis(); 1506 LOG.info("Discarded " + totalCount + " entities for timestamp " + 1507 timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds"); 1508 } 1509 } 1510 1511 @VisibleForTesting getDbIterator(boolean fillCache)1512 LeveldbIterator getDbIterator(boolean fillCache) { 1513 ReadOptions readOptions = new ReadOptions(); 1514 readOptions.fillCache(fillCache); 1515 return new LeveldbIterator(db, readOptions); 1516 } 1517 loadVersion()1518 Version loadVersion() throws IOException { 1519 try { 1520 byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY)); 1521 // if version is not stored previously, treat it as CURRENT_VERSION_INFO. 1522 if (data == null || data.length == 0) { 1523 return getCurrentVersion(); 1524 } 1525 Version version = 1526 new VersionPBImpl(VersionProto.parseFrom(data)); 1527 return version; 1528 } catch(DBException e) { 1529 throw new IOException(e); 1530 } 1531 } 1532 1533 // Only used for test 1534 @VisibleForTesting storeVersion(Version state)1535 void storeVersion(Version state) throws IOException { 1536 dbStoreVersion(state); 1537 } 1538 dbStoreVersion(Version state)1539 private void dbStoreVersion(Version state) throws IOException { 1540 String key = TIMELINE_STORE_VERSION_KEY; 1541 byte[] data = 1542 ((VersionPBImpl) state).getProto().toByteArray(); 1543 try { 1544 db.put(bytes(key), data); 1545 } catch (DBException e) { 1546 throw new IOException(e); 1547 } 1548 } 1549 getCurrentVersion()1550 Version getCurrentVersion() { 1551 return CURRENT_VERSION_INFO; 1552 } 1553 1554 /** 1555 * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc. 1556 * 2) Any incompatible change of TS-store is a major upgrade, and any 1557 * compatible change of TS-store is a minor upgrade. 1558 * 3) Within a minor upgrade, say 1.1 to 1.2: 1559 * overwrite the version info and proceed as normal. 1560 * 4) Within a major upgrade, say 1.2 to 2.0: 1561 * throw exception and indicate user to use a separate upgrade tool to 1562 * upgrade timeline store or remove incompatible old state. 1563 */ checkVersion()1564 private void checkVersion() throws IOException { 1565 Version loadedVersion = loadVersion(); 1566 LOG.info("Loaded timeline store version info " + loadedVersion); 1567 if (loadedVersion.equals(getCurrentVersion())) { 1568 return; 1569 } 1570 if (loadedVersion.isCompatibleTo(getCurrentVersion())) { 1571 LOG.info("Storing timeline store version info " + getCurrentVersion()); 1572 dbStoreVersion(CURRENT_VERSION_INFO); 1573 } else { 1574 String incompatibleMessage = 1575 "Incompatible version for timeline store: expecting version " 1576 + getCurrentVersion() + ", but loading version " + loadedVersion; 1577 LOG.fatal(incompatibleMessage); 1578 throw new IOException(incompatibleMessage); 1579 } 1580 } 1581 1582 //TODO: make data retention work with the domain data as well 1583 @Override put(TimelineDomain domain)1584 public void put(TimelineDomain domain) throws IOException { 1585 WriteBatch writeBatch = null; 1586 try { 1587 writeBatch = db.createWriteBatch(); 1588 if (domain.getId() == null || domain.getId().length() == 0) { 1589 throw new IllegalArgumentException("Domain doesn't have an ID"); 1590 } 1591 if (domain.getOwner() == null || domain.getOwner().length() == 0) { 1592 throw new IllegalArgumentException("Domain doesn't have an owner."); 1593 } 1594 1595 // Write description 1596 byte[] domainEntryKey = createDomainEntryKey( 1597 domain.getId(), DESCRIPTION_COLUMN); 1598 byte[] ownerLookupEntryKey = createOwnerLookupKey( 1599 domain.getOwner(), domain.getId(), DESCRIPTION_COLUMN); 1600 if (domain.getDescription() != null) { 1601 writeBatch.put(domainEntryKey, domain.getDescription(). 1602 getBytes(Charset.forName("UTF-8"))); 1603 writeBatch.put(ownerLookupEntryKey, domain.getDescription(). 1604 getBytes(Charset.forName("UTF-8"))); 1605 } else { 1606 writeBatch.put(domainEntryKey, EMPTY_BYTES); 1607 writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES); 1608 } 1609 1610 // Write owner 1611 domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN); 1612 ownerLookupEntryKey = createOwnerLookupKey( 1613 domain.getOwner(), domain.getId(), OWNER_COLUMN); 1614 // Null check for owner is done before 1615 writeBatch.put(domainEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8"))); 1616 writeBatch.put(ownerLookupEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8"))); 1617 1618 // Write readers 1619 domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN); 1620 ownerLookupEntryKey = createOwnerLookupKey( 1621 domain.getOwner(), domain.getId(), READER_COLUMN); 1622 if (domain.getReaders() != null && domain.getReaders().length() > 0) { 1623 writeBatch.put(domainEntryKey, domain.getReaders().getBytes(Charset.forName("UTF-8"))); 1624 writeBatch.put(ownerLookupEntryKey, domain.getReaders(). 1625 getBytes(Charset.forName("UTF-8"))); 1626 } else { 1627 writeBatch.put(domainEntryKey, EMPTY_BYTES); 1628 writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES); 1629 } 1630 1631 // Write writers 1632 domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN); 1633 ownerLookupEntryKey = createOwnerLookupKey( 1634 domain.getOwner(), domain.getId(), WRITER_COLUMN); 1635 if (domain.getWriters() != null && domain.getWriters().length() > 0) { 1636 writeBatch.put(domainEntryKey, domain.getWriters().getBytes(Charset.forName("UTF-8"))); 1637 writeBatch.put(ownerLookupEntryKey, domain.getWriters(). 1638 getBytes(Charset.forName("UTF-8"))); 1639 } else { 1640 writeBatch.put(domainEntryKey, EMPTY_BYTES); 1641 writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES); 1642 } 1643 1644 // Write creation time and modification time 1645 // We put both timestamps together because they are always retrieved 1646 // together, and store them in the same way as we did for the entity's 1647 // start time and insert time. 1648 domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN); 1649 ownerLookupEntryKey = createOwnerLookupKey( 1650 domain.getOwner(), domain.getId(), TIMESTAMP_COLUMN); 1651 long currentTimestamp = System.currentTimeMillis(); 1652 byte[] timestamps = db.get(domainEntryKey); 1653 if (timestamps == null) { 1654 timestamps = new byte[16]; 1655 writeReverseOrderedLong(currentTimestamp, timestamps, 0); 1656 writeReverseOrderedLong(currentTimestamp, timestamps, 8); 1657 } else { 1658 writeReverseOrderedLong(currentTimestamp, timestamps, 8); 1659 } 1660 writeBatch.put(domainEntryKey, timestamps); 1661 writeBatch.put(ownerLookupEntryKey, timestamps); 1662 db.write(writeBatch); 1663 } catch(DBException e) { 1664 throw new IOException(e); 1665 } finally { 1666 IOUtils.cleanup(LOG, writeBatch); 1667 } 1668 } 1669 1670 /** 1671 * Creates a domain entity key with column name suffix, 1672 * of the form DOMAIN_ENTRY_PREFIX + domain id + column name. 1673 */ createDomainEntryKey(String domainId, byte[] columnName)1674 private static byte[] createDomainEntryKey(String domainId, 1675 byte[] columnName) throws IOException { 1676 return KeyBuilder.newInstance().add(DOMAIN_ENTRY_PREFIX) 1677 .add(domainId).add(columnName).getBytes(); 1678 } 1679 1680 /** 1681 * Creates an owner lookup key with column name suffix, 1682 * of the form OWNER_LOOKUP_PREFIX + owner + domain id + column name. 1683 */ createOwnerLookupKey( String owner, String domainId, byte[] columnName)1684 private static byte[] createOwnerLookupKey( 1685 String owner, String domainId, byte[] columnName) throws IOException { 1686 return KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX) 1687 .add(owner).add(domainId).add(columnName).getBytes(); 1688 } 1689 1690 @Override getDomain(String domainId)1691 public TimelineDomain getDomain(String domainId) 1692 throws IOException { 1693 LeveldbIterator iterator = null; 1694 try { 1695 byte[] prefix = KeyBuilder.newInstance() 1696 .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup(); 1697 iterator = new LeveldbIterator(db); 1698 iterator.seek(prefix); 1699 return getTimelineDomain(iterator, domainId, prefix); 1700 } catch(DBException e) { 1701 throw new IOException(e); 1702 } finally { 1703 IOUtils.cleanup(LOG, iterator); 1704 } 1705 } 1706 1707 @Override getDomains(String owner)1708 public TimelineDomains getDomains(String owner) 1709 throws IOException { 1710 LeveldbIterator iterator = null; 1711 try { 1712 byte[] prefix = KeyBuilder.newInstance() 1713 .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup(); 1714 List<TimelineDomain> domains = new ArrayList<TimelineDomain>(); 1715 for (iterator = new LeveldbIterator(db), iterator.seek(prefix); 1716 iterator.hasNext();) { 1717 byte[] key = iterator.peekNext().getKey(); 1718 if (!prefixMatches(prefix, prefix.length, key)) { 1719 break; 1720 } 1721 // Iterator to parse the rows of an individual domain 1722 KeyParser kp = new KeyParser(key, prefix.length); 1723 String domainId = kp.getNextString(); 1724 byte[] prefixExt = KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX) 1725 .add(owner).add(domainId).getBytesForLookup(); 1726 TimelineDomain domainToReturn = 1727 getTimelineDomain(iterator, domainId, prefixExt); 1728 if (domainToReturn != null) { 1729 domains.add(domainToReturn); 1730 } 1731 } 1732 // Sort the domains to return 1733 Collections.sort(domains, new Comparator<TimelineDomain>() { 1734 @Override 1735 public int compare( 1736 TimelineDomain domain1, TimelineDomain domain2) { 1737 int result = domain2.getCreatedTime().compareTo( 1738 domain1.getCreatedTime()); 1739 if (result == 0) { 1740 return domain2.getModifiedTime().compareTo( 1741 domain1.getModifiedTime()); 1742 } else { 1743 return result; 1744 } 1745 } 1746 }); 1747 TimelineDomains domainsToReturn = new TimelineDomains(); 1748 domainsToReturn.addDomains(domains); 1749 return domainsToReturn; 1750 } catch(DBException e) { 1751 throw new IOException(e); 1752 } finally { 1753 IOUtils.cleanup(LOG, iterator); 1754 } 1755 } 1756 getTimelineDomain( LeveldbIterator iterator, String domainId, byte[] prefix)1757 private static TimelineDomain getTimelineDomain( 1758 LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException { 1759 // Iterate over all the rows whose key starts with prefix to retrieve the 1760 // domain information. 1761 TimelineDomain domain = new TimelineDomain(); 1762 domain.setId(domainId); 1763 boolean noRows = true; 1764 for (; iterator.hasNext(); iterator.next()) { 1765 byte[] key = iterator.peekNext().getKey(); 1766 if (!prefixMatches(prefix, prefix.length, key)) { 1767 break; 1768 } 1769 if (noRows) { 1770 noRows = false; 1771 } 1772 byte[] value = iterator.peekNext().getValue(); 1773 if (value != null && value.length > 0) { 1774 if (key[prefix.length] == DESCRIPTION_COLUMN[0]) { 1775 domain.setDescription(new String(value, Charset.forName("UTF-8"))); 1776 } else if (key[prefix.length] == OWNER_COLUMN[0]) { 1777 domain.setOwner(new String(value, Charset.forName("UTF-8"))); 1778 } else if (key[prefix.length] == READER_COLUMN[0]) { 1779 domain.setReaders(new String(value, Charset.forName("UTF-8"))); 1780 } else if (key[prefix.length] == WRITER_COLUMN[0]) { 1781 domain.setWriters(new String(value, Charset.forName("UTF-8"))); 1782 } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) { 1783 domain.setCreatedTime(readReverseOrderedLong(value, 0)); 1784 domain.setModifiedTime(readReverseOrderedLong(value, 8)); 1785 } else { 1786 LOG.error("Unrecognized domain column: " + key[prefix.length]); 1787 } 1788 } 1789 } 1790 if (noRows) { 1791 return null; 1792 } else { 1793 return domain; 1794 } 1795 } 1796 } 1797