1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.yarn.server.timeline;
20 
21 import com.google.common.annotations.VisibleForTesting;
22 import com.google.common.base.Preconditions;
23 import org.apache.commons.collections.map.LRUMap;
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceAudience.Private;
28 import org.apache.hadoop.classification.InterfaceStability;
29 import org.apache.hadoop.conf.Configuration;
30 import org.apache.hadoop.fs.FileSystem;
31 import org.apache.hadoop.fs.Path;
32 import org.apache.hadoop.fs.permission.FsPermission;
33 import org.apache.hadoop.io.IOUtils;
34 import org.apache.hadoop.io.WritableComparator;
35 import org.apache.hadoop.service.AbstractService;
36 import org.apache.hadoop.yarn.api.records.timeline.*;
37 import org.apache.hadoop.yarn.api.records.timeline.TimelineEvents.EventsOfOneEntity;
38 import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse.TimelinePutError;
39 import org.apache.hadoop.yarn.conf.YarnConfiguration;
40 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
41 import org.apache.hadoop.yarn.server.records.Version;
42 import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
43 import org.apache.hadoop.yarn.server.timeline.TimelineDataManager.CheckAcl;
44 import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyBuilder;
45 import org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.KeyParser;
46 import org.apache.hadoop.yarn.server.utils.LeveldbIterator;
47 import org.fusesource.leveldbjni.JniDBFactory;
48 import org.iq80.leveldb.*;
49 
50 import java.io.File;
51 import java.io.IOException;
52 import java.nio.charset.Charset;
53 import java.util.*;
54 import java.util.Map.Entry;
55 import java.util.concurrent.locks.ReentrantLock;
56 import java.util.concurrent.locks.ReentrantReadWriteLock;
57 
58 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.readReverseOrderedLong;
59 import static org.apache.hadoop.yarn.server.timeline.GenericObjectMapper.writeReverseOrderedLong;
60 import static org.apache.hadoop.yarn.server.timeline.TimelineDataManager.DEFAULT_DOMAIN_ID;
61 import static org.apache.hadoop.yarn.server.timeline.util.LeveldbUtils.prefixMatches;
62 import static org.fusesource.leveldbjni.JniDBFactory.bytes;
63 
64 /**
65  * <p>An implementation of an application timeline store backed by leveldb.</p>
66  *
67  * <p>There are three sections of the db, the start time section,
68  * the entity section, and the indexed entity section.</p>
69  *
70  * <p>The start time section is used to retrieve the unique start time for
71  * a given entity. Its values each contain a start time while its keys are of
72  * the form:</p>
73  * <pre>
74  *   START_TIME_LOOKUP_PREFIX + entity type + entity id</pre>
75  *
76  * <p>The entity section is ordered by entity type, then entity start time
77  * descending, then entity ID. There are four sub-sections of the entity
78  * section: events, primary filters, related entities,
79  * and other info. The event entries have event info serialized into their
80  * values. The other info entries have values corresponding to the values of
81  * the other info name/value map for the entry (note the names are contained
82  * in the key). All other entries have empty values. The key structure is as
83  * follows:</p>
84  * <pre>
85  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id
86  *
87  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
88  *     EVENTS_COLUMN + reveventtimestamp + eventtype
89  *
90  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
91  *     PRIMARY_FILTERS_COLUMN + name + value
92  *
93  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
94  *     OTHER_INFO_COLUMN + name
95  *
96  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
97  *     RELATED_ENTITIES_COLUMN + relatedentity type + relatedentity id
98  *
99  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
100  *     DOMAIN_ID_COLUMN
101  *
102  *   ENTITY_ENTRY_PREFIX + entity type + revstarttime + entity id +
103  *     INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN + relatedentity type +
104  *     relatedentity id</pre>
105  *
106  * <p>The indexed entity section contains a primary filter name and primary
107  * filter value as the prefix. Within a given name/value, entire entity
108  * entries are stored in the same format as described in the entity section
109  * above (below, "key" represents any one of the possible entity entry keys
110  * described above).</p>
111  * <pre>
112  *   INDEXED_ENTRY_PREFIX + primaryfilter name + primaryfilter value +
113  *     key</pre>
114  */
115 @InterfaceAudience.Private
116 @InterfaceStability.Unstable
117 public class LeveldbTimelineStore extends AbstractService
118     implements TimelineStore {
119   private static final Log LOG = LogFactory
120       .getLog(LeveldbTimelineStore.class);
121 
122   @Private
123   @VisibleForTesting
124   static final String FILENAME = "leveldb-timeline-store.ldb";
125 
126   private static final byte[] START_TIME_LOOKUP_PREFIX = "k".getBytes(Charset.forName("UTF-8"));
127   private static final byte[] ENTITY_ENTRY_PREFIX = "e".getBytes(Charset.forName("UTF-8"));
128   private static final byte[] INDEXED_ENTRY_PREFIX = "i".getBytes(Charset.forName("UTF-8"));
129 
130   private static final byte[] EVENTS_COLUMN = "e".getBytes(Charset.forName("UTF-8"));
131   private static final byte[] PRIMARY_FILTERS_COLUMN = "f".getBytes(Charset.forName("UTF-8"));
132   private static final byte[] OTHER_INFO_COLUMN = "i".getBytes(Charset.forName("UTF-8"));
133   private static final byte[] RELATED_ENTITIES_COLUMN = "r".getBytes(Charset.forName("UTF-8"));
134   private static final byte[] INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN =
135       "z".getBytes(Charset.forName("UTF-8"));
136   private static final byte[] DOMAIN_ID_COLUMN = "d".getBytes(Charset.forName("UTF-8"));
137 
138   private static final byte[] DOMAIN_ENTRY_PREFIX = "d".getBytes(Charset.forName("UTF-8"));
139   private static final byte[] OWNER_LOOKUP_PREFIX = "o".getBytes(Charset.forName("UTF-8"));
140   private static final byte[] DESCRIPTION_COLUMN = "d".getBytes(Charset.forName("UTF-8"));
141   private static final byte[] OWNER_COLUMN = "o".getBytes(Charset.forName("UTF-8"));
142   private static final byte[] READER_COLUMN = "r".getBytes(Charset.forName("UTF-8"));
143   private static final byte[] WRITER_COLUMN = "w".getBytes(Charset.forName("UTF-8"));
144   private static final byte[] TIMESTAMP_COLUMN = "t".getBytes(Charset.forName("UTF-8"));
145 
146   private static final byte[] EMPTY_BYTES = new byte[0];
147 
148   private static final String TIMELINE_STORE_VERSION_KEY = "timeline-store-version";
149 
150   private static final Version CURRENT_VERSION_INFO = Version
151       .newInstance(1, 0);
152 
153   @Private
154   @VisibleForTesting
155   static final FsPermission LEVELDB_DIR_UMASK = FsPermission
156       .createImmutable((short) 0700);
157 
158   private Map<EntityIdentifier, StartAndInsertTime> startTimeWriteCache;
159   private Map<EntityIdentifier, Long> startTimeReadCache;
160 
161   /**
162    * Per-entity locks are obtained when writing.
163    */
164   private final LockMap<EntityIdentifier> writeLocks =
165       new LockMap<EntityIdentifier>();
166 
167   private final ReentrantReadWriteLock deleteLock =
168       new ReentrantReadWriteLock();
169 
170   private DB db;
171 
172   private Thread deletionThread;
173 
LeveldbTimelineStore()174   public LeveldbTimelineStore() {
175     super(LeveldbTimelineStore.class.getName());
176   }
177 
178   @Override
179   @SuppressWarnings("unchecked")
serviceInit(Configuration conf)180   protected void serviceInit(Configuration conf) throws Exception {
181     Preconditions.checkArgument(conf.getLong(
182         YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
183         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS) > 0,
184         "%s property value should be greater than zero",
185         YarnConfiguration.TIMELINE_SERVICE_TTL_MS);
186     Preconditions.checkArgument(conf.getLong(
187         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
188         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS) > 0,
189         "%s property value should be greater than zero",
190         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
191     Preconditions.checkArgument(conf.getLong(
192         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
193         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE) >= 0,
194         "%s property value should be greater than or equal to zero",
195         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE);
196     Preconditions.checkArgument(conf.getLong(
197         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
198         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE) > 0,
199         " %s property value should be greater than zero",
200         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
201     Preconditions.checkArgument(conf.getLong(
202         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
203         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE) > 0,
204         "%s property value should be greater than zero",
205         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
206 
207     Options options = new Options();
208     options.createIfMissing(true);
209     options.cacheSize(conf.getLong(
210         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE,
211         YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_READ_CACHE_SIZE));
212     JniDBFactory factory = new JniDBFactory();
213     Path dbPath = new Path(
214         conf.get(YarnConfiguration.TIMELINE_SERVICE_LEVELDB_PATH), FILENAME);
215     FileSystem localFS = null;
216     try {
217       localFS = FileSystem.getLocal(conf);
218       if (!localFS.exists(dbPath)) {
219         if (!localFS.mkdirs(dbPath)) {
220           throw new IOException("Couldn't create directory for leveldb " +
221               "timeline store " + dbPath);
222         }
223         localFS.setPermission(dbPath, LEVELDB_DIR_UMASK);
224       }
225     } finally {
226       IOUtils.cleanup(LOG, localFS);
227     }
228     LOG.info("Using leveldb path " + dbPath);
229     db = factory.open(new File(dbPath.toString()), options);
230     checkVersion();
231     startTimeWriteCache =
232         Collections.synchronizedMap(new LRUMap(getStartTimeWriteCacheSize(
233             conf)));
234     startTimeReadCache =
235         Collections.synchronizedMap(new LRUMap(getStartTimeReadCacheSize(
236             conf)));
237 
238     if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_TTL_ENABLE, true)) {
239       deletionThread = new EntityDeletionThread(conf);
240       deletionThread.start();
241     }
242 
243     super.serviceInit(conf);
244   }
245 
246   @Override
serviceStop()247   protected void serviceStop() throws Exception {
248     if (deletionThread != null) {
249       deletionThread.interrupt();
250       LOG.info("Waiting for deletion thread to complete its current action");
251       try {
252         deletionThread.join();
253       } catch (InterruptedException e) {
254         LOG.warn("Interrupted while waiting for deletion thread to complete," +
255             " closing db now", e);
256       }
257     }
258     IOUtils.cleanup(LOG, db);
259     super.serviceStop();
260   }
261 
262   private static class StartAndInsertTime {
263     final long startTime;
264     final long insertTime;
265 
StartAndInsertTime(long startTime, long insertTime)266     public StartAndInsertTime(long startTime, long insertTime) {
267       this.startTime = startTime;
268       this.insertTime = insertTime;
269     }
270   }
271 
272   private class EntityDeletionThread extends Thread {
273     private final long ttl;
274     private final long ttlInterval;
275 
EntityDeletionThread(Configuration conf)276     public EntityDeletionThread(Configuration conf) {
277       ttl  = conf.getLong(YarnConfiguration.TIMELINE_SERVICE_TTL_MS,
278           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_TTL_MS);
279       ttlInterval = conf.getLong(
280           YarnConfiguration.TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS,
281           YarnConfiguration.DEFAULT_TIMELINE_SERVICE_LEVELDB_TTL_INTERVAL_MS);
282       LOG.info("Starting deletion thread with ttl " + ttl + " and cycle " +
283           "interval " + ttlInterval);
284     }
285 
286     @Override
run()287     public void run() {
288       while (true) {
289         long timestamp = System.currentTimeMillis() - ttl;
290         try {
291           discardOldEntities(timestamp);
292           Thread.sleep(ttlInterval);
293         } catch (IOException e) {
294           LOG.error(e);
295         } catch (InterruptedException e) {
296           LOG.info("Deletion thread received interrupt, exiting");
297           break;
298         }
299       }
300     }
301   }
302 
303   private static class LockMap<K> {
304     private static class CountingReentrantLock<K> extends ReentrantLock {
305       private static final long serialVersionUID = 1L;
306       private int count;
307       private K key;
308 
CountingReentrantLock(K key)309       CountingReentrantLock(K key) {
310         super();
311         this.count = 0;
312         this.key = key;
313       }
314     }
315 
316     private Map<K, CountingReentrantLock<K>> locks =
317         new HashMap<K, CountingReentrantLock<K>>();
318 
getLock(K key)319     synchronized CountingReentrantLock<K> getLock(K key) {
320       CountingReentrantLock<K> lock = locks.get(key);
321       if (lock == null) {
322         lock = new CountingReentrantLock<K>(key);
323         locks.put(key, lock);
324       }
325 
326       lock.count++;
327       return lock;
328     }
329 
returnLock(CountingReentrantLock<K> lock)330     synchronized void returnLock(CountingReentrantLock<K> lock) {
331       if (lock.count == 0) {
332         throw new IllegalStateException("Returned lock more times than it " +
333             "was retrieved");
334       }
335       lock.count--;
336 
337       if (lock.count == 0) {
338         locks.remove(lock.key);
339       }
340     }
341   }
342 
343 
344   @Override
getEntity(String entityId, String entityType, EnumSet<Field> fields)345   public TimelineEntity getEntity(String entityId, String entityType,
346       EnumSet<Field> fields) throws IOException {
347     Long revStartTime = getStartTimeLong(entityId, entityType);
348     if (revStartTime == null) {
349       return null;
350     }
351     byte[] prefix = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
352         .add(entityType).add(writeReverseOrderedLong(revStartTime))
353         .add(entityId).getBytesForLookup();
354 
355     LeveldbIterator iterator = null;
356     try {
357       iterator = new LeveldbIterator(db);
358       iterator.seek(prefix);
359 
360       return getEntity(entityId, entityType, revStartTime, fields, iterator,
361           prefix, prefix.length);
362     } catch(DBException e) {
363       throw new IOException(e);
364     } finally {
365       IOUtils.cleanup(LOG, iterator);
366     }
367   }
368 
369   /**
370    * Read entity from a db iterator.  If no information is found in the
371    * specified fields for this entity, return null.
372    */
getEntity(String entityId, String entityType, Long startTime, EnumSet<Field> fields, LeveldbIterator iterator, byte[] prefix, int prefixlen)373   private static TimelineEntity getEntity(String entityId, String entityType,
374       Long startTime, EnumSet<Field> fields, LeveldbIterator iterator,
375       byte[] prefix, int prefixlen) throws IOException {
376     if (fields == null) {
377       fields = EnumSet.allOf(Field.class);
378     }
379 
380     TimelineEntity entity = new TimelineEntity();
381     boolean events = false;
382     boolean lastEvent = false;
383     if (fields.contains(Field.EVENTS)) {
384       events = true;
385     } else if (fields.contains(Field.LAST_EVENT_ONLY)) {
386       lastEvent = true;
387     } else {
388       entity.setEvents(null);
389     }
390     boolean relatedEntities = false;
391     if (fields.contains(Field.RELATED_ENTITIES)) {
392       relatedEntities = true;
393     } else {
394       entity.setRelatedEntities(null);
395     }
396     boolean primaryFilters = false;
397     if (fields.contains(Field.PRIMARY_FILTERS)) {
398       primaryFilters = true;
399     } else {
400       entity.setPrimaryFilters(null);
401     }
402     boolean otherInfo = false;
403     if (fields.contains(Field.OTHER_INFO)) {
404       otherInfo = true;
405     } else {
406       entity.setOtherInfo(null);
407     }
408 
409     // iterate through the entity's entry, parsing information if it is part
410     // of a requested field
411     for (; iterator.hasNext(); iterator.next()) {
412       byte[] key = iterator.peekNext().getKey();
413       if (!prefixMatches(prefix, prefixlen, key)) {
414         break;
415       }
416       if (key.length == prefixlen) {
417         continue;
418       }
419       if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
420         if (primaryFilters) {
421           addPrimaryFilter(entity, key,
422               prefixlen + PRIMARY_FILTERS_COLUMN.length);
423         }
424       } else if (key[prefixlen] == OTHER_INFO_COLUMN[0]) {
425         if (otherInfo) {
426           entity.addOtherInfo(parseRemainingKey(key,
427               prefixlen + OTHER_INFO_COLUMN.length),
428               GenericObjectMapper.read(iterator.peekNext().getValue()));
429         }
430       } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
431         if (relatedEntities) {
432           addRelatedEntity(entity, key,
433               prefixlen + RELATED_ENTITIES_COLUMN.length);
434         }
435       } else if (key[prefixlen] == EVENTS_COLUMN[0]) {
436         if (events || (lastEvent &&
437             entity.getEvents().size() == 0)) {
438           TimelineEvent event = getEntityEvent(null, key, prefixlen +
439               EVENTS_COLUMN.length, iterator.peekNext().getValue());
440           if (event != null) {
441             entity.addEvent(event);
442           }
443         }
444       } else if (key[prefixlen] == DOMAIN_ID_COLUMN[0]) {
445         byte[] v = iterator.peekNext().getValue();
446         String domainId = new String(v, Charset.forName("UTF-8"));
447         entity.setDomainId(domainId);
448       } else {
449         if (key[prefixlen] !=
450             INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
451           LOG.warn(String.format("Found unexpected column for entity %s of " +
452               "type %s (0x%02x)", entityId, entityType, key[prefixlen]));
453         }
454       }
455     }
456 
457     entity.setEntityId(entityId);
458     entity.setEntityType(entityType);
459     entity.setStartTime(startTime);
460 
461     return entity;
462   }
463 
464   @Override
getEntityTimelines(String entityType, SortedSet<String> entityIds, Long limit, Long windowStart, Long windowEnd, Set<String> eventType)465   public TimelineEvents getEntityTimelines(String entityType,
466       SortedSet<String> entityIds, Long limit, Long windowStart,
467       Long windowEnd, Set<String> eventType) throws IOException {
468     TimelineEvents events = new TimelineEvents();
469     if (entityIds == null || entityIds.isEmpty()) {
470       return events;
471     }
472     // create a lexicographically-ordered map from start time to entities
473     Map<byte[], List<EntityIdentifier>> startTimeMap = new TreeMap<byte[],
474         List<EntityIdentifier>>(new Comparator<byte[]>() {
475           @Override
476           public int compare(byte[] o1, byte[] o2) {
477             return WritableComparator.compareBytes(o1, 0, o1.length, o2, 0,
478                 o2.length);
479           }
480         });
481     LeveldbIterator iterator = null;
482     try {
483       // look up start times for the specified entities
484       // skip entities with no start time
485       for (String entityId : entityIds) {
486         byte[] startTime = getStartTime(entityId, entityType);
487         if (startTime != null) {
488           List<EntityIdentifier> entities = startTimeMap.get(startTime);
489           if (entities == null) {
490             entities = new ArrayList<EntityIdentifier>();
491             startTimeMap.put(startTime, entities);
492           }
493           entities.add(new EntityIdentifier(entityId, entityType));
494         }
495       }
496       for (Entry<byte[], List<EntityIdentifier>> entry :
497           startTimeMap.entrySet()) {
498         // look up the events matching the given parameters (limit,
499         // start time, end time, event types) for entities whose start times
500         // were found and add the entities to the return list
501         byte[] revStartTime = entry.getKey();
502         for (EntityIdentifier entityIdentifier : entry.getValue()) {
503           EventsOfOneEntity entity = new EventsOfOneEntity();
504           entity.setEntityId(entityIdentifier.getId());
505           entity.setEntityType(entityType);
506           events.addEvent(entity);
507           KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
508               .add(entityType).add(revStartTime).add(entityIdentifier.getId())
509               .add(EVENTS_COLUMN);
510           byte[] prefix = kb.getBytesForLookup();
511           if (windowEnd == null) {
512             windowEnd = Long.MAX_VALUE;
513           }
514           byte[] revts = writeReverseOrderedLong(windowEnd);
515           kb.add(revts);
516           byte[] first = kb.getBytesForLookup();
517           byte[] last = null;
518           if (windowStart != null) {
519             last = KeyBuilder.newInstance().add(prefix)
520                 .add(writeReverseOrderedLong(windowStart)).getBytesForLookup();
521           }
522           if (limit == null) {
523             limit = DEFAULT_LIMIT;
524           }
525           iterator = new LeveldbIterator(db);
526           for (iterator.seek(first); entity.getEvents().size() < limit &&
527               iterator.hasNext(); iterator.next()) {
528             byte[] key = iterator.peekNext().getKey();
529             if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
530                 WritableComparator.compareBytes(key, 0, key.length, last, 0,
531                     last.length) > 0)) {
532               break;
533             }
534             TimelineEvent event = getEntityEvent(eventType, key, prefix.length,
535                 iterator.peekNext().getValue());
536             if (event != null) {
537               entity.addEvent(event);
538             }
539           }
540         }
541       }
542     } catch(DBException e) {
543       throw new IOException(e);
544     } finally {
545       IOUtils.cleanup(LOG, iterator);
546     }
547     return events;
548   }
549 
550   @Override
getEntities(String entityType, Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs, NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, CheckAcl checkAcl)551   public TimelineEntities getEntities(String entityType,
552       Long limit, Long windowStart, Long windowEnd, String fromId, Long fromTs,
553       NameValuePair primaryFilter, Collection<NameValuePair> secondaryFilters,
554       EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
555     if (primaryFilter == null) {
556       // if no primary filter is specified, prefix the lookup with
557       // ENTITY_ENTRY_PREFIX
558       return getEntityByTime(ENTITY_ENTRY_PREFIX, entityType, limit,
559           windowStart, windowEnd, fromId, fromTs, secondaryFilters,
560           fields, checkAcl);
561     } else {
562       // if a primary filter is specified, prefix the lookup with
563       // INDEXED_ENTRY_PREFIX + primaryFilterName + primaryFilterValue +
564       // ENTITY_ENTRY_PREFIX
565       byte[] base = KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
566           .add(primaryFilter.getName())
567           .add(GenericObjectMapper.write(primaryFilter.getValue()), true)
568           .add(ENTITY_ENTRY_PREFIX).getBytesForLookup();
569       return getEntityByTime(base, entityType, limit, windowStart, windowEnd,
570           fromId, fromTs, secondaryFilters, fields, checkAcl);
571     }
572   }
573 
574   /**
575    * Retrieves a list of entities satisfying given parameters.
576    *
577    * @param base A byte array prefix for the lookup
578    * @param entityType The type of the entity
579    * @param limit A limit on the number of entities to return
580    * @param starttime The earliest entity start time to retrieve (exclusive)
581    * @param endtime The latest entity start time to retrieve (inclusive)
582    * @param fromId Retrieve entities starting with this entity
583    * @param fromTs Ignore entities with insert timestamp later than this ts
584    * @param secondaryFilters Filter pairs that the entities should match
585    * @param fields The set of fields to retrieve
586    * @return A list of entities
587    * @throws IOException
588    */
getEntityByTime(byte[] base, String entityType, Long limit, Long starttime, Long endtime, String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters, EnumSet<Field> fields, CheckAcl checkAcl)589   private TimelineEntities getEntityByTime(byte[] base,
590       String entityType, Long limit, Long starttime, Long endtime,
591       String fromId, Long fromTs, Collection<NameValuePair> secondaryFilters,
592       EnumSet<Field> fields, CheckAcl checkAcl) throws IOException {
593     LeveldbIterator iterator = null;
594     try {
595       KeyBuilder kb = KeyBuilder.newInstance().add(base).add(entityType);
596       // only db keys matching the prefix (base + entity type) will be parsed
597       byte[] prefix = kb.getBytesForLookup();
598       if (endtime == null) {
599         // if end time is null, place no restriction on end time
600         endtime = Long.MAX_VALUE;
601       }
602       // construct a first key that will be seeked to using end time or fromId
603       byte[] first = null;
604       if (fromId != null) {
605         Long fromIdStartTime = getStartTimeLong(fromId, entityType);
606         if (fromIdStartTime == null) {
607           // no start time for provided id, so return empty entities
608           return new TimelineEntities();
609         }
610         if (fromIdStartTime <= endtime) {
611           // if provided id's start time falls before the end of the window,
612           // use it to construct the seek key
613           first = kb.add(writeReverseOrderedLong(fromIdStartTime))
614               .add(fromId).getBytesForLookup();
615         }
616       }
617       // if seek key wasn't constructed using fromId, construct it using end ts
618       if (first == null) {
619         first = kb.add(writeReverseOrderedLong(endtime)).getBytesForLookup();
620       }
621       byte[] last = null;
622       if (starttime != null) {
623         // if start time is not null, set a last key that will not be
624         // iterated past
625         last = KeyBuilder.newInstance().add(base).add(entityType)
626             .add(writeReverseOrderedLong(starttime)).getBytesForLookup();
627       }
628       if (limit == null) {
629         // if limit is not specified, use the default
630         limit = DEFAULT_LIMIT;
631       }
632 
633       TimelineEntities entities = new TimelineEntities();
634       iterator = new LeveldbIterator(db);
635       iterator.seek(first);
636       // iterate until one of the following conditions is met: limit is
637       // reached, there are no more keys, the key prefix no longer matches,
638       // or a start time has been specified and reached/exceeded
639       while (entities.getEntities().size() < limit && iterator.hasNext()) {
640         byte[] key = iterator.peekNext().getKey();
641         if (!prefixMatches(prefix, prefix.length, key) || (last != null &&
642             WritableComparator.compareBytes(key, 0, key.length, last, 0,
643                 last.length) > 0)) {
644           break;
645         }
646         // read the start time and entity id from the current key
647         KeyParser kp = new KeyParser(key, prefix.length);
648         Long startTime = kp.getNextLong();
649         String entityId = kp.getNextString();
650 
651         if (fromTs != null) {
652           long insertTime = readReverseOrderedLong(iterator.peekNext()
653               .getValue(), 0);
654           if (insertTime > fromTs) {
655             byte[] firstKey = key;
656             while (iterator.hasNext() && prefixMatches(firstKey,
657                 kp.getOffset(), key)) {
658               iterator.next();
659               key = iterator.peekNext().getKey();
660             }
661             continue;
662           }
663         }
664 
665         // parse the entity that owns this key, iterating over all keys for
666         // the entity
667         TimelineEntity entity = getEntity(entityId, entityType, startTime,
668             fields, iterator, key, kp.getOffset());
669         // determine if the retrieved entity matches the provided secondary
670         // filters, and if so add it to the list of entities to return
671         boolean filterPassed = true;
672         if (secondaryFilters != null) {
673           for (NameValuePair filter : secondaryFilters) {
674             Object v = entity.getOtherInfo().get(filter.getName());
675             if (v == null) {
676               Set<Object> vs = entity.getPrimaryFilters()
677                   .get(filter.getName());
678               if (vs == null || !vs.contains(filter.getValue())) {
679                 filterPassed = false;
680                 break;
681               }
682             } else if (!v.equals(filter.getValue())) {
683               filterPassed = false;
684               break;
685             }
686           }
687         }
688         if (filterPassed) {
689           if (entity.getDomainId() == null) {
690             entity.setDomainId(DEFAULT_DOMAIN_ID);
691           }
692           if (checkAcl == null || checkAcl.check(entity)) {
693             entities.addEntity(entity);
694           }
695         }
696       }
697       return entities;
698     } catch(DBException e) {
699       throw new IOException(e);
700     } finally {
701       IOUtils.cleanup(LOG, iterator);
702     }
703   }
704 
705   /**
706    * Handle error and set it in response.
707    */
handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode)708   private static void handleError(TimelineEntity entity, TimelinePutResponse response, final int errorCode) {
709     TimelinePutError error = new TimelinePutError();
710     error.setEntityId(entity.getEntityId());
711     error.setEntityType(entity.getEntityType());
712     error.setErrorCode(errorCode);
713     response.addError(error);
714   }
715 
716   /**
717    * Put a single entity.  If there is an error, add a TimelinePutError to the
718    * given response.
719    */
put(TimelineEntity entity, TimelinePutResponse response, boolean allowEmptyDomainId)720   private void put(TimelineEntity entity, TimelinePutResponse response,
721       boolean allowEmptyDomainId) {
722     LockMap.CountingReentrantLock<EntityIdentifier> lock =
723         writeLocks.getLock(new EntityIdentifier(entity.getEntityId(),
724             entity.getEntityType()));
725     lock.lock();
726     WriteBatch writeBatch = null;
727     List<EntityIdentifier> relatedEntitiesWithoutStartTimes =
728         new ArrayList<EntityIdentifier>();
729     byte[] revStartTime = null;
730     Map<String, Set<Object>> primaryFilters = null;
731     try {
732       writeBatch = db.createWriteBatch();
733       List<TimelineEvent> events = entity.getEvents();
734       // look up the start time for the entity
735       StartAndInsertTime startAndInsertTime = getAndSetStartTime(
736           entity.getEntityId(), entity.getEntityType(),
737           entity.getStartTime(), events);
738       if (startAndInsertTime == null) {
739         // if no start time is found, add an error and return
740         handleError(entity, response, TimelinePutError.NO_START_TIME);
741         return;
742       }
743       revStartTime = writeReverseOrderedLong(startAndInsertTime
744           .startTime);
745 
746       primaryFilters = entity.getPrimaryFilters();
747 
748       // write entity marker
749       byte[] markerKey = createEntityMarkerKey(entity.getEntityId(),
750           entity.getEntityType(), revStartTime);
751       byte[] markerValue = writeReverseOrderedLong(startAndInsertTime
752           .insertTime);
753       writeBatch.put(markerKey, markerValue);
754       writePrimaryFilterEntries(writeBatch, primaryFilters, markerKey,
755           markerValue);
756 
757       // write event entries
758       if (events != null && !events.isEmpty()) {
759         for (TimelineEvent event : events) {
760           byte[] revts = writeReverseOrderedLong(event.getTimestamp());
761           byte[] key = createEntityEventKey(entity.getEntityId(),
762               entity.getEntityType(), revStartTime, revts,
763               event.getEventType());
764           byte[] value = GenericObjectMapper.write(event.getEventInfo());
765           writeBatch.put(key, value);
766           writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
767         }
768       }
769 
770       // write related entity entries
771       Map<String, Set<String>> relatedEntities =
772           entity.getRelatedEntities();
773       if (relatedEntities != null && !relatedEntities.isEmpty()) {
774         for (Entry<String, Set<String>> relatedEntityList :
775             relatedEntities.entrySet()) {
776           String relatedEntityType = relatedEntityList.getKey();
777           for (String relatedEntityId : relatedEntityList.getValue()) {
778             // invisible "reverse" entries (entity -> related entity)
779             byte[] key = createReverseRelatedEntityKey(entity.getEntityId(),
780                 entity.getEntityType(), revStartTime, relatedEntityId,
781                 relatedEntityType);
782             writeBatch.put(key, EMPTY_BYTES);
783             // look up start time of related entity
784             byte[] relatedEntityStartTime = getStartTime(relatedEntityId,
785                 relatedEntityType);
786             // delay writing the related entity if no start time is found
787             if (relatedEntityStartTime == null) {
788               relatedEntitiesWithoutStartTimes.add(
789                   new EntityIdentifier(relatedEntityId, relatedEntityType));
790               continue;
791             } else {
792               // This is the existing entity
793               byte[] domainIdBytes = db.get(createDomainIdKey(
794                   relatedEntityId, relatedEntityType, relatedEntityStartTime));
795               // The timeline data created by the server before 2.6 won't have
796               // the domain field. We assume this timeline data is in the
797               // default timeline domain.
798               String domainId = null;
799               if (domainIdBytes == null) {
800                 domainId = TimelineDataManager.DEFAULT_DOMAIN_ID;
801               } else {
802                 domainId = new String(domainIdBytes, Charset.forName("UTF-8"));
803               }
804               if (!domainId.equals(entity.getDomainId())) {
805                 // in this case the entity will be put, but the relation will be
806                 // ignored
807                 handleError(entity, response, TimelinePutError.FORBIDDEN_RELATION);
808                 continue;
809               }
810             }
811             // write "forward" entry (related entity -> entity)
812             key = createRelatedEntityKey(relatedEntityId,
813                 relatedEntityType, relatedEntityStartTime,
814                 entity.getEntityId(), entity.getEntityType());
815             writeBatch.put(key, EMPTY_BYTES);
816           }
817         }
818       }
819 
820       // write primary filter entries
821       if (primaryFilters != null && !primaryFilters.isEmpty()) {
822         for (Entry<String, Set<Object>> primaryFilter :
823             primaryFilters.entrySet()) {
824           for (Object primaryFilterValue : primaryFilter.getValue()) {
825             byte[] key = createPrimaryFilterKey(entity.getEntityId(),
826                 entity.getEntityType(), revStartTime,
827                 primaryFilter.getKey(), primaryFilterValue);
828             writeBatch.put(key, EMPTY_BYTES);
829             writePrimaryFilterEntries(writeBatch, primaryFilters, key,
830                 EMPTY_BYTES);
831           }
832         }
833       }
834 
835       // write other info entries
836       Map<String, Object> otherInfo = entity.getOtherInfo();
837       if (otherInfo != null && !otherInfo.isEmpty()) {
838         for (Entry<String, Object> i : otherInfo.entrySet()) {
839           byte[] key = createOtherInfoKey(entity.getEntityId(),
840               entity.getEntityType(), revStartTime, i.getKey());
841           byte[] value = GenericObjectMapper.write(i.getValue());
842           writeBatch.put(key, value);
843           writePrimaryFilterEntries(writeBatch, primaryFilters, key, value);
844         }
845       }
846 
847       // write domain id entry
848       byte[] key = createDomainIdKey(entity.getEntityId(),
849           entity.getEntityType(), revStartTime);
850       if (entity.getDomainId() == null ||
851           entity.getDomainId().length() == 0) {
852         if (!allowEmptyDomainId) {
853           handleError(entity, response, TimelinePutError.NO_DOMAIN);
854           return;
855         }
856       } else {
857         writeBatch.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8")));
858         writePrimaryFilterEntries(writeBatch, primaryFilters, key,
859             entity.getDomainId().getBytes(Charset.forName("UTF-8")));
860       }
861       db.write(writeBatch);
862     } catch (DBException de) {
863       LOG.error("Error putting entity " + entity.getEntityId() +
864                 " of type " + entity.getEntityType(), de);
865       handleError(entity, response, TimelinePutError.IO_EXCEPTION);
866     } catch (IOException e) {
867       LOG.error("Error putting entity " + entity.getEntityId() +
868           " of type " + entity.getEntityType(), e);
869       handleError(entity, response, TimelinePutError.IO_EXCEPTION);
870     } finally {
871       lock.unlock();
872       writeLocks.returnLock(lock);
873       IOUtils.cleanup(LOG, writeBatch);
874     }
875 
876     for (EntityIdentifier relatedEntity : relatedEntitiesWithoutStartTimes) {
877       lock = writeLocks.getLock(relatedEntity);
878       lock.lock();
879       try {
880         StartAndInsertTime relatedEntityStartAndInsertTime =
881             getAndSetStartTime(relatedEntity.getId(), relatedEntity.getType(),
882             readReverseOrderedLong(revStartTime, 0), null);
883         if (relatedEntityStartAndInsertTime == null) {
884           throw new IOException("Error setting start time for related entity");
885         }
886         byte[] relatedEntityStartTime = writeReverseOrderedLong(
887             relatedEntityStartAndInsertTime.startTime);
888           // This is the new entity, the domain should be the same
889         byte[] key = createDomainIdKey(relatedEntity.getId(),
890             relatedEntity.getType(), relatedEntityStartTime);
891         db.put(key, entity.getDomainId().getBytes(Charset.forName("UTF-8")));
892         db.put(createRelatedEntityKey(relatedEntity.getId(),
893             relatedEntity.getType(), relatedEntityStartTime,
894             entity.getEntityId(), entity.getEntityType()), EMPTY_BYTES);
895         db.put(createEntityMarkerKey(relatedEntity.getId(),
896             relatedEntity.getType(), relatedEntityStartTime),
897             writeReverseOrderedLong(relatedEntityStartAndInsertTime
898                 .insertTime));
899       } catch (DBException de) {
900         LOG.error("Error putting related entity " + relatedEntity.getId() +
901             " of type " + relatedEntity.getType() + " for entity " +
902             entity.getEntityId() + " of type " + entity.getEntityType(), de);
903         handleError(entity, response, TimelinePutError.IO_EXCEPTION);
904       } catch (IOException e) {
905         LOG.error("Error putting related entity " + relatedEntity.getId() +
906             " of type " + relatedEntity.getType() + " for entity " +
907             entity.getEntityId() + " of type " + entity.getEntityType(), e);
908         handleError(entity, response, TimelinePutError.IO_EXCEPTION);
909       } finally {
910         lock.unlock();
911         writeLocks.returnLock(lock);
912       }
913     }
914   }
915 
916   /**
917    * For a given key / value pair that has been written to the db,
918    * write additional entries to the db for each primary filter.
919    */
writePrimaryFilterEntries(WriteBatch writeBatch, Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)920   private static void writePrimaryFilterEntries(WriteBatch writeBatch,
921       Map<String, Set<Object>> primaryFilters, byte[] key, byte[] value)
922       throws IOException {
923     if (primaryFilters != null && !primaryFilters.isEmpty()) {
924       for (Entry<String, Set<Object>> pf : primaryFilters.entrySet()) {
925         for (Object pfval : pf.getValue()) {
926           writeBatch.put(addPrimaryFilterToKey(pf.getKey(), pfval,
927               key), value);
928         }
929       }
930     }
931   }
932 
933   @Override
put(TimelineEntities entities)934   public TimelinePutResponse put(TimelineEntities entities) {
935     try {
936       deleteLock.readLock().lock();
937       TimelinePutResponse response = new TimelinePutResponse();
938       for (TimelineEntity entity : entities.getEntities()) {
939         put(entity, response, false);
940       }
941       return response;
942     } finally {
943       deleteLock.readLock().unlock();
944     }
945   }
946 
947   @Private
948   @VisibleForTesting
putWithNoDomainId(TimelineEntities entities)949   public TimelinePutResponse putWithNoDomainId(TimelineEntities entities) {
950     try {
951       deleteLock.readLock().lock();
952       TimelinePutResponse response = new TimelinePutResponse();
953       for (TimelineEntity entity : entities.getEntities()) {
954         put(entity, response, true);
955       }
956       return response;
957     } finally {
958       deleteLock.readLock().unlock();
959     }
960   }
961 
962   /**
963    * Get the unique start time for a given entity as a byte array that sorts
964    * the timestamps in reverse order (see {@link
965    * GenericObjectMapper#writeReverseOrderedLong(long)}).
966    *
967    * @param entityId The id of the entity
968    * @param entityType The type of the entity
969    * @return A byte array, null if not found
970    * @throws IOException
971    */
getStartTime(String entityId, String entityType)972   private byte[] getStartTime(String entityId, String entityType)
973       throws IOException {
974     Long l = getStartTimeLong(entityId, entityType);
975     return l == null ? null : writeReverseOrderedLong(l);
976   }
977 
978   /**
979    * Get the unique start time for a given entity as a Long.
980    *
981    * @param entityId The id of the entity
982    * @param entityType The type of the entity
983    * @return A Long, null if not found
984    * @throws IOException
985    */
getStartTimeLong(String entityId, String entityType)986   private Long getStartTimeLong(String entityId, String entityType)
987       throws IOException {
988     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
989     try {
990       // start time is not provided, so try to look it up
991       if (startTimeReadCache.containsKey(entity)) {
992         // found the start time in the cache
993         return startTimeReadCache.get(entity);
994       } else {
995         // try to look up the start time in the db
996         byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
997         byte[] v = db.get(b);
998         if (v == null) {
999           // did not find the start time in the db
1000           return null;
1001         } else {
1002           // found the start time in the db
1003           Long l = readReverseOrderedLong(v, 0);
1004           startTimeReadCache.put(entity, l);
1005           return l;
1006         }
1007       }
1008     } catch(DBException e) {
1009       throw new IOException(e);
1010     }
1011   }
1012 
1013   /**
1014    * Get the unique start time for a given entity as a byte array that sorts
1015    * the timestamps in reverse order (see {@link
1016    * GenericObjectMapper#writeReverseOrderedLong(long)}). If the start time
1017    * doesn't exist, set it based on the information provided. Should only be
1018    * called when a lock has been obtained on the entity.
1019    *
1020    * @param entityId The id of the entity
1021    * @param entityType The type of the entity
1022    * @param startTime The start time of the entity, or null
1023    * @param events A list of events for the entity, or null
1024    * @return A StartAndInsertTime
1025    * @throws IOException
1026    */
getAndSetStartTime(String entityId, String entityType, Long startTime, List<TimelineEvent> events)1027   private StartAndInsertTime getAndSetStartTime(String entityId,
1028       String entityType, Long startTime, List<TimelineEvent> events)
1029       throws IOException {
1030     EntityIdentifier entity = new EntityIdentifier(entityId, entityType);
1031     if (startTime == null) {
1032       // start time is not provided, so try to look it up
1033       if (startTimeWriteCache.containsKey(entity)) {
1034         // found the start time in the cache
1035         return startTimeWriteCache.get(entity);
1036       } else {
1037         if (events != null) {
1038           // prepare a start time from events in case it is needed
1039           Long min = Long.MAX_VALUE;
1040           for (TimelineEvent e : events) {
1041             if (min > e.getTimestamp()) {
1042               min = e.getTimestamp();
1043             }
1044           }
1045           startTime = min;
1046         }
1047         return checkStartTimeInDb(entity, startTime);
1048       }
1049     } else {
1050       // start time is provided
1051       if (startTimeWriteCache.containsKey(entity)) {
1052         // always use start time from cache if it exists
1053         return startTimeWriteCache.get(entity);
1054       } else {
1055         // check the provided start time matches the db
1056         return checkStartTimeInDb(entity, startTime);
1057       }
1058     }
1059   }
1060 
1061   /**
1062    * Checks db for start time and returns it if it exists.  If it doesn't
1063    * exist, writes the suggested start time (if it is not null).  This is
1064    * only called when the start time is not found in the cache,
1065    * so it adds it back into the cache if it is found. Should only be called
1066    * when a lock has been obtained on the entity.
1067    */
checkStartTimeInDb(EntityIdentifier entity, Long suggestedStartTime)1068   private StartAndInsertTime checkStartTimeInDb(EntityIdentifier entity,
1069       Long suggestedStartTime) throws IOException {
1070     StartAndInsertTime startAndInsertTime = null;
1071     // create lookup key for start time
1072     byte[] b = createStartTimeLookupKey(entity.getId(), entity.getType());
1073     try {
1074       // retrieve value for key
1075       byte[] v = db.get(b);
1076       if (v == null) {
1077         // start time doesn't exist in db
1078         if (suggestedStartTime == null) {
1079           return null;
1080         }
1081         startAndInsertTime = new StartAndInsertTime(suggestedStartTime,
1082             System.currentTimeMillis());
1083 
1084         // write suggested start time
1085         v = new byte[16];
1086         writeReverseOrderedLong(suggestedStartTime, v, 0);
1087         writeReverseOrderedLong(startAndInsertTime.insertTime, v, 8);
1088         WriteOptions writeOptions = new WriteOptions();
1089         writeOptions.sync(true);
1090         db.put(b, v, writeOptions);
1091       } else {
1092         // found start time in db, so ignore suggested start time
1093         startAndInsertTime = new StartAndInsertTime(readReverseOrderedLong(v, 0),
1094             readReverseOrderedLong(v, 8));
1095       }
1096     } catch(DBException e) {
1097       throw new IOException(e);
1098     }
1099     startTimeWriteCache.put(entity, startAndInsertTime);
1100     startTimeReadCache.put(entity, startAndInsertTime.startTime);
1101     return startAndInsertTime;
1102   }
1103 
1104   /**
1105    * Creates a key for looking up the start time of a given entity,
1106    * of the form START_TIME_LOOKUP_PREFIX + entity type + entity id.
1107    */
createStartTimeLookupKey(String entityId, String entityType)1108   private static byte[] createStartTimeLookupKey(String entityId,
1109       String entityType) throws IOException {
1110     return KeyBuilder.newInstance().add(START_TIME_LOOKUP_PREFIX)
1111         .add(entityType).add(entityId).getBytes();
1112   }
1113 
1114   /**
1115    * Creates an entity marker, serializing ENTITY_ENTRY_PREFIX + entity type +
1116    * revstarttime + entity id.
1117    */
createEntityMarkerKey(String entityId, String entityType, byte[] revStartTime)1118   private static byte[] createEntityMarkerKey(String entityId,
1119       String entityType, byte[] revStartTime) throws IOException {
1120     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
1121         .add(entityType).add(revStartTime).add(entityId).getBytesForLookup();
1122   }
1123 
1124   /**
1125    * Creates an index entry for the given key of the form
1126    * INDEXED_ENTRY_PREFIX + primaryfiltername + primaryfiltervalue + key.
1127    */
addPrimaryFilterToKey(String primaryFilterName, Object primaryFilterValue, byte[] key)1128   private static byte[] addPrimaryFilterToKey(String primaryFilterName,
1129       Object primaryFilterValue, byte[] key) throws IOException {
1130     return KeyBuilder.newInstance().add(INDEXED_ENTRY_PREFIX)
1131         .add(primaryFilterName)
1132         .add(GenericObjectMapper.write(primaryFilterValue), true).add(key)
1133         .getBytes();
1134   }
1135 
1136   /**
1137    * Creates an event key, serializing ENTITY_ENTRY_PREFIX + entity type +
1138    * revstarttime + entity id + EVENTS_COLUMN + reveventtimestamp + event type.
1139    */
createEntityEventKey(String entityId, String entityType, byte[] revStartTime, byte[] revEventTimestamp, String eventType)1140   private static byte[] createEntityEventKey(String entityId,
1141       String entityType, byte[] revStartTime, byte[] revEventTimestamp,
1142       String eventType) throws IOException {
1143     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
1144         .add(entityType).add(revStartTime).add(entityId).add(EVENTS_COLUMN)
1145         .add(revEventTimestamp).add(eventType).getBytes();
1146   }
1147 
1148   /**
1149    * Creates an event object from the given key, offset, and value.  If the
1150    * event type is not contained in the specified set of event types,
1151    * returns null.
1152    */
getEntityEvent(Set<String> eventTypes, byte[] key, int offset, byte[] value)1153   private static TimelineEvent getEntityEvent(Set<String> eventTypes,
1154       byte[] key, int offset, byte[] value) throws IOException {
1155     KeyParser kp = new KeyParser(key, offset);
1156     long ts = kp.getNextLong();
1157     String tstype = kp.getNextString();
1158     if (eventTypes == null || eventTypes.contains(tstype)) {
1159       TimelineEvent event = new TimelineEvent();
1160       event.setTimestamp(ts);
1161       event.setEventType(tstype);
1162       Object o = GenericObjectMapper.read(value);
1163       if (o == null) {
1164         event.setEventInfo(null);
1165       } else if (o instanceof Map) {
1166         @SuppressWarnings("unchecked")
1167         Map<String, Object> m = (Map<String, Object>) o;
1168         event.setEventInfo(m);
1169       } else {
1170         throw new IOException("Couldn't deserialize event info map");
1171       }
1172       return event;
1173     }
1174     return null;
1175   }
1176 
1177   /**
1178    * Creates a primary filter key, serializing ENTITY_ENTRY_PREFIX +
1179    * entity type + revstarttime + entity id + PRIMARY_FILTERS_COLUMN + name +
1180    * value.
1181    */
createPrimaryFilterKey(String entityId, String entityType, byte[] revStartTime, String name, Object value)1182   private static byte[] createPrimaryFilterKey(String entityId,
1183       String entityType, byte[] revStartTime, String name, Object value)
1184       throws IOException {
1185     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
1186         .add(revStartTime).add(entityId).add(PRIMARY_FILTERS_COLUMN).add(name)
1187         .add(GenericObjectMapper.write(value)).getBytes();
1188   }
1189 
1190   /**
1191    * Parses the primary filter from the given key at the given offset and
1192    * adds it to the given entity.
1193    */
addPrimaryFilter(TimelineEntity entity, byte[] key, int offset)1194   private static void addPrimaryFilter(TimelineEntity entity, byte[] key,
1195       int offset) throws IOException {
1196     KeyParser kp = new KeyParser(key, offset);
1197     String name = kp.getNextString();
1198     Object value = GenericObjectMapper.read(key, kp.getOffset());
1199     entity.addPrimaryFilter(name, value);
1200   }
1201 
1202   /**
1203    * Creates an other info key, serializing ENTITY_ENTRY_PREFIX + entity type +
1204    * revstarttime + entity id + OTHER_INFO_COLUMN + name.
1205    */
createOtherInfoKey(String entityId, String entityType, byte[] revStartTime, String name)1206   private static byte[] createOtherInfoKey(String entityId, String entityType,
1207       byte[] revStartTime, String name) throws IOException {
1208     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
1209         .add(revStartTime).add(entityId).add(OTHER_INFO_COLUMN).add(name)
1210         .getBytes();
1211   }
1212 
1213   /**
1214    * Creates a string representation of the byte array from the given offset
1215    * to the end of the array (for parsing other info keys).
1216    */
parseRemainingKey(byte[] b, int offset)1217   private static String parseRemainingKey(byte[] b, int offset) {
1218     return new String(b, offset, b.length - offset, Charset.forName("UTF-8"));
1219   }
1220 
1221   /**
1222    * Creates a related entity key, serializing ENTITY_ENTRY_PREFIX +
1223    * entity type + revstarttime + entity id + RELATED_ENTITIES_COLUMN +
1224    * relatedentity type + relatedentity id.
1225    */
createRelatedEntityKey(String entityId, String entityType, byte[] revStartTime, String relatedEntityId, String relatedEntityType)1226   private static byte[] createRelatedEntityKey(String entityId,
1227       String entityType, byte[] revStartTime, String relatedEntityId,
1228       String relatedEntityType) throws IOException {
1229     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
1230         .add(revStartTime).add(entityId).add(RELATED_ENTITIES_COLUMN)
1231         .add(relatedEntityType).add(relatedEntityId).getBytes();
1232   }
1233 
1234   /**
1235    * Parses the related entity from the given key at the given offset and
1236    * adds it to the given entity.
1237    */
addRelatedEntity(TimelineEntity entity, byte[] key, int offset)1238   private static void addRelatedEntity(TimelineEntity entity, byte[] key,
1239       int offset) throws IOException {
1240     KeyParser kp = new KeyParser(key, offset);
1241     String type = kp.getNextString();
1242     String id = kp.getNextString();
1243     entity.addRelatedEntity(type, id);
1244   }
1245 
1246   /**
1247    * Creates a reverse related entity key, serializing ENTITY_ENTRY_PREFIX +
1248    * entity type + revstarttime + entity id +
1249    * INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN +
1250    * relatedentity type + relatedentity id.
1251    */
createReverseRelatedEntityKey(String entityId, String entityType, byte[] revStartTime, String relatedEntityId, String relatedEntityType)1252   private static byte[] createReverseRelatedEntityKey(String entityId,
1253       String entityType, byte[] revStartTime, String relatedEntityId,
1254       String relatedEntityType) throws IOException {
1255     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
1256         .add(revStartTime).add(entityId)
1257         .add(INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN)
1258         .add(relatedEntityType).add(relatedEntityId).getBytes();
1259   }
1260 
1261   /**
1262    * Creates a domain id key, serializing ENTITY_ENTRY_PREFIX +
1263    * entity type + revstarttime + entity id + DOMAIN_ID_COLUMN.
1264    */
createDomainIdKey(String entityId, String entityType, byte[] revStartTime)1265   private static byte[] createDomainIdKey(String entityId,
1266       String entityType, byte[] revStartTime) throws IOException {
1267     return KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX).add(entityType)
1268         .add(revStartTime).add(entityId).add(DOMAIN_ID_COLUMN).getBytes();
1269   }
1270   /**
1271    * Clears the cache to test reloading start times from leveldb (only for
1272    * testing).
1273    */
1274   @VisibleForTesting
clearStartTimeCache()1275   void clearStartTimeCache() {
1276     startTimeWriteCache.clear();
1277     startTimeReadCache.clear();
1278   }
1279 
1280   @VisibleForTesting
getStartTimeReadCacheSize(Configuration conf)1281   static int getStartTimeReadCacheSize(Configuration conf) {
1282     return conf.getInt(
1283         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE,
1284         YarnConfiguration.
1285             DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_READ_CACHE_SIZE);
1286   }
1287 
1288   @VisibleForTesting
getStartTimeWriteCacheSize(Configuration conf)1289   static int getStartTimeWriteCacheSize(Configuration conf) {
1290     return conf.getInt(
1291         YarnConfiguration.TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE,
1292         YarnConfiguration.
1293             DEFAULT_TIMELINE_SERVICE_LEVELDB_START_TIME_WRITE_CACHE_SIZE);
1294   }
1295 
1296   @VisibleForTesting
getEntityTypes()1297   List<String> getEntityTypes() throws IOException {
1298     LeveldbIterator iterator = null;
1299     try {
1300       iterator = getDbIterator(false);
1301       List<String> entityTypes = new ArrayList<String>();
1302       iterator.seek(ENTITY_ENTRY_PREFIX);
1303       while (iterator.hasNext()) {
1304         byte[] key = iterator.peekNext().getKey();
1305         if (key[0] != ENTITY_ENTRY_PREFIX[0]) {
1306           break;
1307         }
1308         KeyParser kp = new KeyParser(key,
1309             ENTITY_ENTRY_PREFIX.length);
1310         String entityType = kp.getNextString();
1311         entityTypes.add(entityType);
1312         byte[] lookupKey = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
1313             .add(entityType).getBytesForLookup();
1314         if (lookupKey[lookupKey.length - 1] != 0x0) {
1315           throw new IOException("Found unexpected end byte in lookup key");
1316         }
1317         lookupKey[lookupKey.length - 1] = 0x1;
1318         iterator.seek(lookupKey);
1319       }
1320       return entityTypes;
1321     } catch(DBException e) {
1322       throw new IOException(e);
1323     } finally {
1324       IOUtils.cleanup(LOG, iterator);
1325     }
1326   }
1327 
1328   /**
1329    * Finds all keys in the db that have a given prefix and deletes them on
1330    * the given write batch.
1331    */
deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix, LeveldbIterator iterator)1332   private void deleteKeysWithPrefix(WriteBatch writeBatch, byte[] prefix,
1333       LeveldbIterator iterator) {
1334     for (iterator.seek(prefix); iterator.hasNext(); iterator.next()) {
1335       byte[] key = iterator.peekNext().getKey();
1336       if (!prefixMatches(prefix, prefix.length, key)) {
1337         break;
1338       }
1339       writeBatch.delete(key);
1340     }
1341   }
1342 
1343   @VisibleForTesting
deleteNextEntity(String entityType, byte[] reverseTimestamp, LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)1344   boolean deleteNextEntity(String entityType, byte[] reverseTimestamp,
1345       LeveldbIterator iterator, LeveldbIterator pfIterator, boolean seeked)
1346       throws IOException {
1347     WriteBatch writeBatch = null;
1348     try {
1349       KeyBuilder kb = KeyBuilder.newInstance().add(ENTITY_ENTRY_PREFIX)
1350           .add(entityType);
1351       byte[] typePrefix = kb.getBytesForLookup();
1352       kb.add(reverseTimestamp);
1353       if (!seeked) {
1354         iterator.seek(kb.getBytesForLookup());
1355       }
1356       if (!iterator.hasNext()) {
1357         return false;
1358       }
1359       byte[] entityKey = iterator.peekNext().getKey();
1360       if (!prefixMatches(typePrefix, typePrefix.length, entityKey)) {
1361         return false;
1362       }
1363 
1364       // read the start time and entity id from the current key
1365       KeyParser kp = new KeyParser(entityKey, typePrefix.length + 8);
1366       String entityId = kp.getNextString();
1367       int prefixlen = kp.getOffset();
1368       byte[] deletePrefix = new byte[prefixlen];
1369       System.arraycopy(entityKey, 0, deletePrefix, 0, prefixlen);
1370 
1371       writeBatch = db.createWriteBatch();
1372 
1373       if (LOG.isDebugEnabled()) {
1374         LOG.debug("Deleting entity type:" + entityType + " id:" + entityId);
1375       }
1376       // remove start time from cache and db
1377       writeBatch.delete(createStartTimeLookupKey(entityId, entityType));
1378       EntityIdentifier entityIdentifier =
1379           new EntityIdentifier(entityId, entityType);
1380       startTimeReadCache.remove(entityIdentifier);
1381       startTimeWriteCache.remove(entityIdentifier);
1382 
1383       // delete current entity
1384       for (; iterator.hasNext(); iterator.next()) {
1385         byte[] key = iterator.peekNext().getKey();
1386         if (!prefixMatches(entityKey, prefixlen, key)) {
1387           break;
1388         }
1389         writeBatch.delete(key);
1390 
1391         if (key.length == prefixlen) {
1392           continue;
1393         }
1394         if (key[prefixlen] == PRIMARY_FILTERS_COLUMN[0]) {
1395           kp = new KeyParser(key,
1396               prefixlen + PRIMARY_FILTERS_COLUMN.length);
1397           String name = kp.getNextString();
1398           Object value = GenericObjectMapper.read(key, kp.getOffset());
1399           deleteKeysWithPrefix(writeBatch, addPrimaryFilterToKey(name, value,
1400               deletePrefix), pfIterator);
1401           if (LOG.isDebugEnabled()) {
1402             LOG.debug("Deleting entity type:" + entityType + " id:" +
1403                 entityId + " primary filter entry " + name + " " +
1404                 value);
1405           }
1406         } else if (key[prefixlen] == RELATED_ENTITIES_COLUMN[0]) {
1407           kp = new KeyParser(key,
1408               prefixlen + RELATED_ENTITIES_COLUMN.length);
1409           String type = kp.getNextString();
1410           String id = kp.getNextString();
1411           byte[] relatedEntityStartTime = getStartTime(id, type);
1412           if (relatedEntityStartTime == null) {
1413             LOG.warn("Found no start time for " +
1414                 "related entity " + id + " of type " + type + " while " +
1415                 "deleting " + entityId + " of type " + entityType);
1416             continue;
1417           }
1418           writeBatch.delete(createReverseRelatedEntityKey(id, type,
1419               relatedEntityStartTime, entityId, entityType));
1420           if (LOG.isDebugEnabled()) {
1421             LOG.debug("Deleting entity type:" + entityType + " id:" +
1422                 entityId + " from invisible reverse related entity " +
1423                 "entry of type:" + type + " id:" + id);
1424           }
1425         } else if (key[prefixlen] ==
1426             INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN[0]) {
1427           kp = new KeyParser(key, prefixlen +
1428               INVISIBLE_REVERSE_RELATED_ENTITIES_COLUMN.length);
1429           String type = kp.getNextString();
1430           String id = kp.getNextString();
1431           byte[] relatedEntityStartTime = getStartTime(id, type);
1432           if (relatedEntityStartTime == null) {
1433             LOG.warn("Found no start time for reverse " +
1434                 "related entity " + id + " of type " + type + " while " +
1435                 "deleting " + entityId + " of type " + entityType);
1436             continue;
1437           }
1438           writeBatch.delete(createRelatedEntityKey(id, type,
1439               relatedEntityStartTime, entityId, entityType));
1440           if (LOG.isDebugEnabled()) {
1441             LOG.debug("Deleting entity type:" + entityType + " id:" +
1442                 entityId + " from related entity entry of type:" +
1443                 type + " id:" + id);
1444           }
1445         }
1446       }
1447       WriteOptions writeOptions = new WriteOptions();
1448       writeOptions.sync(true);
1449       db.write(writeBatch, writeOptions);
1450       return true;
1451     } catch(DBException e) {
1452       throw new IOException(e);
1453     } finally {
1454       IOUtils.cleanup(LOG, writeBatch);
1455     }
1456   }
1457 
1458   /**
1459    * Discards entities with start timestamp less than or equal to the given
1460    * timestamp.
1461    */
1462   @VisibleForTesting
discardOldEntities(long timestamp)1463   void discardOldEntities(long timestamp)
1464       throws IOException, InterruptedException {
1465     byte[] reverseTimestamp = writeReverseOrderedLong(timestamp);
1466     long totalCount = 0;
1467     long t1 = System.currentTimeMillis();
1468     try {
1469       List<String> entityTypes = getEntityTypes();
1470       for (String entityType : entityTypes) {
1471         LeveldbIterator iterator = null;
1472         LeveldbIterator pfIterator = null;
1473         long typeCount = 0;
1474         try {
1475           deleteLock.writeLock().lock();
1476           iterator = getDbIterator(false);
1477           pfIterator = getDbIterator(false);
1478 
1479           if (deletionThread != null && deletionThread.isInterrupted()) {
1480             throw new InterruptedException();
1481           }
1482           boolean seeked = false;
1483           while (deleteNextEntity(entityType, reverseTimestamp, iterator,
1484               pfIterator, seeked)) {
1485             typeCount++;
1486             totalCount++;
1487             seeked = true;
1488             if (deletionThread != null && deletionThread.isInterrupted()) {
1489               throw new InterruptedException();
1490             }
1491           }
1492         } catch (IOException e) {
1493           LOG.error("Got IOException while deleting entities for type " +
1494               entityType + ", continuing to next type", e);
1495         } finally {
1496           IOUtils.cleanup(LOG, iterator, pfIterator);
1497           deleteLock.writeLock().unlock();
1498           if (typeCount > 0) {
1499             LOG.info("Deleted " + typeCount + " entities of type " +
1500                 entityType);
1501           }
1502         }
1503       }
1504     } finally {
1505       long t2 = System.currentTimeMillis();
1506       LOG.info("Discarded " + totalCount + " entities for timestamp " +
1507           timestamp + " and earlier in " + (t2 - t1) / 1000.0 + " seconds");
1508     }
1509   }
1510 
1511   @VisibleForTesting
getDbIterator(boolean fillCache)1512   LeveldbIterator getDbIterator(boolean fillCache) {
1513     ReadOptions readOptions = new ReadOptions();
1514     readOptions.fillCache(fillCache);
1515     return new LeveldbIterator(db, readOptions);
1516   }
1517 
loadVersion()1518   Version loadVersion() throws IOException {
1519     try {
1520       byte[] data = db.get(bytes(TIMELINE_STORE_VERSION_KEY));
1521       // if version is not stored previously, treat it as CURRENT_VERSION_INFO.
1522       if (data == null || data.length == 0) {
1523         return getCurrentVersion();
1524       }
1525       Version version =
1526           new VersionPBImpl(VersionProto.parseFrom(data));
1527       return version;
1528     } catch(DBException e) {
1529       throw new IOException(e);
1530     }
1531   }
1532 
1533   // Only used for test
1534   @VisibleForTesting
storeVersion(Version state)1535   void storeVersion(Version state) throws IOException {
1536     dbStoreVersion(state);
1537   }
1538 
dbStoreVersion(Version state)1539   private void dbStoreVersion(Version state) throws IOException {
1540     String key = TIMELINE_STORE_VERSION_KEY;
1541     byte[] data =
1542         ((VersionPBImpl) state).getProto().toByteArray();
1543     try {
1544       db.put(bytes(key), data);
1545     } catch (DBException e) {
1546       throw new IOException(e);
1547     }
1548   }
1549 
getCurrentVersion()1550   Version getCurrentVersion() {
1551     return CURRENT_VERSION_INFO;
1552   }
1553 
1554   /**
1555    * 1) Versioning timeline store: major.minor. For e.g. 1.0, 1.1, 1.2...1.25, 2.0 etc.
1556    * 2) Any incompatible change of TS-store is a major upgrade, and any
1557    *    compatible change of TS-store is a minor upgrade.
1558    * 3) Within a minor upgrade, say 1.1 to 1.2:
1559    *    overwrite the version info and proceed as normal.
1560    * 4) Within a major upgrade, say 1.2 to 2.0:
1561    *    throw exception and indicate user to use a separate upgrade tool to
1562    *    upgrade timeline store or remove incompatible old state.
1563    */
checkVersion()1564   private void checkVersion() throws IOException {
1565     Version loadedVersion = loadVersion();
1566     LOG.info("Loaded timeline store version info " + loadedVersion);
1567     if (loadedVersion.equals(getCurrentVersion())) {
1568       return;
1569     }
1570     if (loadedVersion.isCompatibleTo(getCurrentVersion())) {
1571       LOG.info("Storing timeline store version info " + getCurrentVersion());
1572       dbStoreVersion(CURRENT_VERSION_INFO);
1573     } else {
1574       String incompatibleMessage =
1575           "Incompatible version for timeline store: expecting version "
1576               + getCurrentVersion() + ", but loading version " + loadedVersion;
1577       LOG.fatal(incompatibleMessage);
1578       throw new IOException(incompatibleMessage);
1579     }
1580   }
1581 
1582   //TODO: make data retention work with the domain data as well
1583   @Override
put(TimelineDomain domain)1584   public void put(TimelineDomain domain) throws IOException {
1585     WriteBatch writeBatch = null;
1586     try {
1587       writeBatch = db.createWriteBatch();
1588       if (domain.getId() == null || domain.getId().length() == 0) {
1589         throw new IllegalArgumentException("Domain doesn't have an ID");
1590       }
1591       if (domain.getOwner() == null || domain.getOwner().length() == 0) {
1592         throw new IllegalArgumentException("Domain doesn't have an owner.");
1593       }
1594 
1595       // Write description
1596       byte[] domainEntryKey = createDomainEntryKey(
1597           domain.getId(), DESCRIPTION_COLUMN);
1598       byte[] ownerLookupEntryKey = createOwnerLookupKey(
1599           domain.getOwner(), domain.getId(), DESCRIPTION_COLUMN);
1600       if (domain.getDescription() != null) {
1601         writeBatch.put(domainEntryKey, domain.getDescription().
1602                        getBytes(Charset.forName("UTF-8")));
1603         writeBatch.put(ownerLookupEntryKey, domain.getDescription().
1604                        getBytes(Charset.forName("UTF-8")));
1605       } else {
1606         writeBatch.put(domainEntryKey, EMPTY_BYTES);
1607         writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
1608       }
1609 
1610       // Write owner
1611       domainEntryKey = createDomainEntryKey(domain.getId(), OWNER_COLUMN);
1612       ownerLookupEntryKey = createOwnerLookupKey(
1613           domain.getOwner(), domain.getId(), OWNER_COLUMN);
1614       // Null check for owner is done before
1615       writeBatch.put(domainEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8")));
1616       writeBatch.put(ownerLookupEntryKey, domain.getOwner().getBytes(Charset.forName("UTF-8")));
1617 
1618       // Write readers
1619       domainEntryKey = createDomainEntryKey(domain.getId(), READER_COLUMN);
1620       ownerLookupEntryKey = createOwnerLookupKey(
1621           domain.getOwner(), domain.getId(), READER_COLUMN);
1622       if (domain.getReaders() != null && domain.getReaders().length() > 0) {
1623         writeBatch.put(domainEntryKey, domain.getReaders().getBytes(Charset.forName("UTF-8")));
1624         writeBatch.put(ownerLookupEntryKey, domain.getReaders().
1625                        getBytes(Charset.forName("UTF-8")));
1626       } else {
1627         writeBatch.put(domainEntryKey, EMPTY_BYTES);
1628         writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
1629       }
1630 
1631       // Write writers
1632       domainEntryKey = createDomainEntryKey(domain.getId(), WRITER_COLUMN);
1633       ownerLookupEntryKey = createOwnerLookupKey(
1634           domain.getOwner(), domain.getId(), WRITER_COLUMN);
1635       if (domain.getWriters() != null && domain.getWriters().length() > 0) {
1636         writeBatch.put(domainEntryKey, domain.getWriters().getBytes(Charset.forName("UTF-8")));
1637         writeBatch.put(ownerLookupEntryKey, domain.getWriters().
1638                        getBytes(Charset.forName("UTF-8")));
1639       } else {
1640         writeBatch.put(domainEntryKey, EMPTY_BYTES);
1641         writeBatch.put(ownerLookupEntryKey, EMPTY_BYTES);
1642       }
1643 
1644       // Write creation time and modification time
1645       // We put both timestamps together because they are always retrieved
1646       // together, and store them in the same way as we did for the entity's
1647       // start time and insert time.
1648       domainEntryKey = createDomainEntryKey(domain.getId(), TIMESTAMP_COLUMN);
1649       ownerLookupEntryKey = createOwnerLookupKey(
1650           domain.getOwner(), domain.getId(), TIMESTAMP_COLUMN);
1651       long currentTimestamp = System.currentTimeMillis();
1652       byte[] timestamps = db.get(domainEntryKey);
1653       if (timestamps == null) {
1654         timestamps = new byte[16];
1655         writeReverseOrderedLong(currentTimestamp, timestamps, 0);
1656         writeReverseOrderedLong(currentTimestamp, timestamps, 8);
1657       } else {
1658         writeReverseOrderedLong(currentTimestamp, timestamps, 8);
1659       }
1660       writeBatch.put(domainEntryKey, timestamps);
1661       writeBatch.put(ownerLookupEntryKey, timestamps);
1662       db.write(writeBatch);
1663     } catch(DBException e) {
1664       throw new IOException(e);
1665     } finally {
1666       IOUtils.cleanup(LOG, writeBatch);
1667     }
1668   }
1669 
1670   /**
1671    * Creates a domain entity key with column name suffix,
1672    * of the form DOMAIN_ENTRY_PREFIX + domain id + column name.
1673    */
createDomainEntryKey(String domainId, byte[] columnName)1674   private static byte[] createDomainEntryKey(String domainId,
1675       byte[] columnName) throws IOException {
1676     return KeyBuilder.newInstance().add(DOMAIN_ENTRY_PREFIX)
1677         .add(domainId).add(columnName).getBytes();
1678   }
1679 
1680   /**
1681    * Creates an owner lookup key with column name suffix,
1682    * of the form OWNER_LOOKUP_PREFIX + owner + domain id + column name.
1683    */
createOwnerLookupKey( String owner, String domainId, byte[] columnName)1684   private static byte[] createOwnerLookupKey(
1685       String owner, String domainId, byte[] columnName) throws IOException {
1686     return KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
1687         .add(owner).add(domainId).add(columnName).getBytes();
1688   }
1689 
1690   @Override
getDomain(String domainId)1691   public TimelineDomain getDomain(String domainId)
1692       throws IOException {
1693     LeveldbIterator iterator = null;
1694     try {
1695       byte[] prefix = KeyBuilder.newInstance()
1696           .add(DOMAIN_ENTRY_PREFIX).add(domainId).getBytesForLookup();
1697       iterator = new LeveldbIterator(db);
1698       iterator.seek(prefix);
1699       return getTimelineDomain(iterator, domainId, prefix);
1700     } catch(DBException e) {
1701       throw new IOException(e);
1702     } finally {
1703       IOUtils.cleanup(LOG, iterator);
1704     }
1705   }
1706 
1707   @Override
getDomains(String owner)1708   public TimelineDomains getDomains(String owner)
1709       throws IOException {
1710     LeveldbIterator iterator = null;
1711     try {
1712       byte[] prefix = KeyBuilder.newInstance()
1713           .add(OWNER_LOOKUP_PREFIX).add(owner).getBytesForLookup();
1714       List<TimelineDomain> domains = new ArrayList<TimelineDomain>();
1715       for (iterator = new LeveldbIterator(db), iterator.seek(prefix);
1716           iterator.hasNext();) {
1717         byte[] key = iterator.peekNext().getKey();
1718         if (!prefixMatches(prefix, prefix.length, key)) {
1719           break;
1720         }
1721         // Iterator to parse the rows of an individual domain
1722         KeyParser kp = new KeyParser(key, prefix.length);
1723         String domainId = kp.getNextString();
1724         byte[] prefixExt = KeyBuilder.newInstance().add(OWNER_LOOKUP_PREFIX)
1725             .add(owner).add(domainId).getBytesForLookup();
1726         TimelineDomain domainToReturn =
1727             getTimelineDomain(iterator, domainId, prefixExt);
1728         if (domainToReturn != null) {
1729           domains.add(domainToReturn);
1730         }
1731       }
1732       // Sort the domains to return
1733       Collections.sort(domains, new Comparator<TimelineDomain>() {
1734         @Override
1735         public int compare(
1736             TimelineDomain domain1, TimelineDomain domain2) {
1737            int result = domain2.getCreatedTime().compareTo(
1738                domain1.getCreatedTime());
1739            if (result == 0) {
1740              return domain2.getModifiedTime().compareTo(
1741                  domain1.getModifiedTime());
1742            } else {
1743              return result;
1744            }
1745         }
1746       });
1747       TimelineDomains domainsToReturn = new TimelineDomains();
1748       domainsToReturn.addDomains(domains);
1749       return domainsToReturn;
1750     } catch(DBException e) {
1751       throw new IOException(e);
1752     } finally {
1753       IOUtils.cleanup(LOG, iterator);
1754     }
1755   }
1756 
getTimelineDomain( LeveldbIterator iterator, String domainId, byte[] prefix)1757   private static TimelineDomain getTimelineDomain(
1758       LeveldbIterator iterator, String domainId, byte[] prefix) throws IOException {
1759     // Iterate over all the rows whose key starts with prefix to retrieve the
1760     // domain information.
1761     TimelineDomain domain = new TimelineDomain();
1762     domain.setId(domainId);
1763     boolean noRows = true;
1764     for (; iterator.hasNext(); iterator.next()) {
1765       byte[] key = iterator.peekNext().getKey();
1766       if (!prefixMatches(prefix, prefix.length, key)) {
1767         break;
1768       }
1769       if (noRows) {
1770         noRows = false;
1771       }
1772       byte[] value = iterator.peekNext().getValue();
1773       if (value != null && value.length > 0) {
1774         if (key[prefix.length] == DESCRIPTION_COLUMN[0]) {
1775           domain.setDescription(new String(value, Charset.forName("UTF-8")));
1776         } else if (key[prefix.length] == OWNER_COLUMN[0]) {
1777           domain.setOwner(new String(value, Charset.forName("UTF-8")));
1778         } else if (key[prefix.length] == READER_COLUMN[0]) {
1779           domain.setReaders(new String(value, Charset.forName("UTF-8")));
1780         } else if (key[prefix.length] == WRITER_COLUMN[0]) {
1781           domain.setWriters(new String(value, Charset.forName("UTF-8")));
1782         } else if (key[prefix.length] == TIMESTAMP_COLUMN[0]) {
1783           domain.setCreatedTime(readReverseOrderedLong(value, 0));
1784           domain.setModifiedTime(readReverseOrderedLong(value, 8));
1785         } else {
1786           LOG.error("Unrecognized domain column: " + key[prefix.length]);
1787         }
1788       }
1789     }
1790     if (noRows) {
1791       return null;
1792     } else {
1793       return domain;
1794     }
1795   }
1796 }
1797