1 // This file is part of OpenTSDB.
2 // Copyright (C) 2015  The OpenTSDB Authors.
3 //
4 // This program is free software: you can redistribute it and/or modify it
5 // under the terms of the GNU Lesser General Public License as published by
6 // the Free Software Foundation, either version 2.1 of the License, or (at your
7 // option) any later version.  This program is distributed in the hope that it
8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser
10 // General Public License for more details.  You should have received a copy
11 // of the GNU Lesser General Public License along with this program.  If not,
12 // see <http://www.gnu.org/licenses/>.
13 package net.opentsdb.core;
14 
15 import java.util.Collection;
16 import java.util.Map;
17 import java.util.TreeMap;
18 
19 import net.opentsdb.core.Internal.Cell;
20 import net.opentsdb.utils.DateTime;
21 
22 import org.hbase.async.Bytes;
23 import org.hbase.async.KeyValue;
24 import org.hbase.async.PutRequest;
25 import org.slf4j.Logger;
26 import org.slf4j.LoggerFactory;
27 
28 import com.stumbleupon.async.Deferred;
29 
30 /**
31  * A class that deals with serializing/deserializing appended data point columns.
32  * In busy TSDB installs appends can save on storage at write time and network
33  * bandwidth as TSD compactions are no longer necessary. Each data point is
34  * concatenated to a byte array in storage. At query time, the values are ordered
35  * and de-duped. Optionally the column can be re-written when out of order or
36  * duplicates are detected.
37  * NOTE: This will increase CPU usage on your HBase servers as it has to perform
38  * the atomic read-modify-write operation on the column.
39  * @since 2.2
40  */
41 public class AppendDataPoints {
42   private static final Logger LOG = LoggerFactory.getLogger(AppendDataPoints.class);
43 
44   /** The prefix ID of append columns */
45   public static final byte APPEND_COLUMN_PREFIX = 0x05;
46 
47   /** The full column qualifier for append columns */
48   public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[] {
49     APPEND_COLUMN_PREFIX, 0x00, 0x00};
50 
51   /** A threshold in seconds where we avoid writing repairs */
52   public static final int REPAIR_THRESHOLD = 3600;
53 
54   /** Filled with the qualifiers in the compacted data points format after parsing */
55   private byte[] qualifier;
56 
57   /** Filled with the values in the compacted data points format after parsing */
58   private byte[] value;
59 
60   /** A deferred that is set if a repaired column was sent to storage */
61   private Deferred<Object> repaired_deferred = null;
62 
63   /**
64    * Default empty ctor
65    */
AppendDataPoints()66   public AppendDataPoints() {
67 
68   }
69 
70   /**
71    * Creates a new AppendDataPoints object from a qualifier and value. You can
72    * then call {@link #getBytes()} to write to TSDB.
73    * @param qualifier The qualifier with the time offset, type and length flags.
74    * @param value The value to append
75    * @throws IllegalArgumentException if the qualifier or value is null or empty
76    */
AppendDataPoints(final byte[] qualifier, final byte[] value)77   public AppendDataPoints(final byte[] qualifier, final byte[] value) {
78     if (qualifier == null || qualifier.length < 1) {
79       throw new IllegalArgumentException("Qualifier cannot be null or empty");
80     }
81     if (value == null || value.length < 1) {
82       throw new IllegalArgumentException("Value cannot be null or empty");
83     }
84     this.qualifier = qualifier;
85     this.value = value;
86   }
87 
88   /**
89    * Concatenates the qualifier and value for appending to a column in the
90    * backing data store.
91    * @return A byte array to append to the value of a column.
92    */
getBytes()93   public byte[] getBytes() {
94     final byte[] bytes = new byte[qualifier.length + value.length];
95     System.arraycopy(this.qualifier, 0, bytes, 0, qualifier.length);
96     System.arraycopy(value, 0, bytes, qualifier.length, value.length);
97     return bytes;
98   }
99 
100   /**
101    * Parses a column from storage, orders and drops newer duplicate data points.
102    * The parsing will return both a Cell collection for debugging and add
103    * the cells to concatenated qualifier and value arrays in the compacted data
104    * point format so that the results can be merged with other non-append
105    * columns or rows.
106    * <p>
107    * WARNING: If the "tsd.core.repair_appends" config is set to true then this
108    * method will issue puts against the database, overwriting the column with
109    * sorted and de-duplicated data. It will only do this for rows that are at
110    * least an hour old so as to avoid pounding current rows.
111    * <p>
112    * TODO (CL) - allow for newer or older data points depending on a config.
113    * @param tsdb The TSDB to which we belong
114    * @param kv The key value t parse
115    * @throws IllegalArgumentException if the given KV is not an append column
116    * or we were unable to parse the value.
117    */
parseKeyValue(final TSDB tsdb, final KeyValue kv)118   public final Collection<Cell> parseKeyValue(final TSDB tsdb, final KeyValue kv) {
119     if (kv.qualifier().length != 3 || kv.qualifier()[0] != APPEND_COLUMN_PREFIX) {
120       // it's really not an issue if the offset is not 0, maybe in the future
121       // we'll support appends at different offsets.
122       throw new IllegalArgumentException("Can not parse cell, it is not " +
123         " an appended cell. It has a different qualifier " +
124         Bytes.pretty(kv.qualifier()) + ", row key " + Bytes.pretty(kv.key()));
125     }
126     final boolean repair = tsdb.getConfig().repair_appends();
127     final long base_time;
128     try {
129       base_time = Internal.baseTime(tsdb, kv.key());
130     } catch (ArrayIndexOutOfBoundsException oob) {
131       throw new IllegalDataException("Corrupted value: invalid row key: " + kv,
132           oob);
133     }
134 
135     int val_idx = 0;
136     int val_length = 0;
137     int qual_length = 0;
138     int last_delta = -1;  // Time delta, extracted from the qualifier.
139 
140     final Map<Integer, Internal.Cell> deltas = new TreeMap<Integer, Cell>();
141     boolean has_duplicates = false;
142     boolean out_of_order = false;
143     boolean needs_repair = false;
144 
145     try {
146       while (val_idx < kv.value().length) {
147         byte[] q = Internal.extractQualifier(kv.value(), val_idx);
148         System.arraycopy(kv.value(), val_idx, q, 0, q.length);
149         val_idx=val_idx + q.length;
150 
151         int vlen = Internal.getValueLengthFromQualifier(q, 0);
152         byte[] v = new byte[vlen];
153         System.arraycopy(kv.value(), val_idx, v, 0, vlen);
154         val_idx += vlen;
155         int delta = Internal.getOffsetFromQualifier(q);
156 
157         final Cell duplicate = deltas.get(delta);
158         if (duplicate != null) {
159           // This is a duplicate cell, skip it
160           has_duplicates = true;
161           qual_length -= duplicate.qualifier.length;
162           val_length -= duplicate.value.length;
163         }
164 
165         qual_length += q.length;
166         val_length += vlen;
167         final Cell cell = new Cell(q, v);
168         deltas.put(delta, cell);
169 
170         if (!out_of_order) {
171           // Data points needs to be sorted if we find at least one out of
172           // order data
173           if (delta <= last_delta) {
174             out_of_order = true;
175           }
176           last_delta = delta;
177         }
178       }
179     } catch (ArrayIndexOutOfBoundsException oob) {
180       throw new IllegalDataException("Corrupted value: couldn't break down"
181           + " into individual values (consumed " + val_idx + " bytes, but was"
182           + " expecting to consume " + (kv.value().length) + "): " + kv
183           + ", cells so far: " + deltas.values(), oob);
184     }
185 
186     if (has_duplicates || out_of_order) {
187       if ((DateTime.currentTimeMillis() / 1000) - base_time > REPAIR_THRESHOLD) {
188         needs_repair = true;
189       }
190     }
191 
192     // Check we consumed all the bytes of the value.
193     if (val_idx != kv.value().length) {
194       throw new IllegalDataException("Corrupted value: couldn't break down"
195       + " into individual values (consumed " + val_idx + " bytes, but was"
196       + " expecting to consume " + (kv.value().length) + "): " + kv
197       + ", cells so far: " + deltas.values());
198     }
199 
200     val_idx = 0;
201     int qual_idx = 0;
202     byte[] healed_cell = null;
203     int healed_index = 0;
204 
205     this.value = new byte[val_length];
206     this.qualifier = new byte[qual_length];
207 
208     if (repair && needs_repair) {
209       healed_cell = new byte[val_length+qual_length];
210     }
211 
212     for (final Cell cell: deltas.values()) {
213       System.arraycopy(cell.qualifier, 0, this.qualifier, qual_idx,
214           cell.qualifier.length);
215       qual_idx += cell.qualifier.length;
216       System.arraycopy(cell.value, 0, this.value, val_idx, cell.value.length);
217       val_idx += cell.value.length;
218 
219       if (repair && needs_repair) {
220         System.arraycopy(cell.qualifier, 0, healed_cell, healed_index,
221             cell.qualifier.length);
222         healed_index += cell.qualifier.length;
223         System.arraycopy(cell.value, 0, healed_cell, healed_index, cell.value.length);
224         healed_index += cell.value.length;
225       }
226     }
227 
228     if (repair && needs_repair) {
229       LOG.debug("Repairing appended data column " + kv);
230       final PutRequest put = new PutRequest(tsdb.table, kv.key(),
231           TSDB.FAMILY(), kv.qualifier(), healed_cell);
232       repaired_deferred = tsdb.getClient().put(put);
233     }
234 
235     return deltas.values();
236   }
237 
238   /** @return the sorted qualifier in a compacted data point format after
239    * {@link #parseKeyValue(TSDB, KeyValue)} has been called */
qualifier()240   public byte[] qualifier() {
241     return qualifier;
242   }
243 
244   /** @return the sorted value in a compacted data point format after
245    * {@link #parseKeyValue(TSDB, KeyValue)} has been called */
value()246   public byte[] value() {
247     return value;
248   }
249 
250   /** @return a deferred to wait on if the call to
251    * {@link #parseKeyValue(TSDB, KeyValue)} triggered a put to storage. */
repairedDeferred()252   public Deferred<Object> repairedDeferred() {
253     return repaired_deferred;
254   }
255 
256   /** @return whether or not a qualifier of AppendDataPoints */
isAppendDataPoints(byte[] qualifier)257   public static boolean isAppendDataPoints(byte[] qualifier) {
258     return qualifier != null && qualifier.length == 3 && qualifier[0] == APPEND_COLUMN_PREFIX;
259   }
260 }
261