1 // This file is part of OpenTSDB. 2 // Copyright (C) 2015 The OpenTSDB Authors. 3 // 4 // This program is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Lesser General Public License as published by 6 // the Free Software Foundation, either version 2.1 of the License, or (at your 7 // option) any later version. This program is distributed in the hope that it 8 // will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 9 // of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser 10 // General Public License for more details. You should have received a copy 11 // of the GNU Lesser General Public License along with this program. If not, 12 // see <http://www.gnu.org/licenses/>. 13 package net.opentsdb.core; 14 15 import java.util.Collection; 16 import java.util.Map; 17 import java.util.TreeMap; 18 19 import net.opentsdb.core.Internal.Cell; 20 import net.opentsdb.utils.DateTime; 21 22 import org.hbase.async.Bytes; 23 import org.hbase.async.KeyValue; 24 import org.hbase.async.PutRequest; 25 import org.slf4j.Logger; 26 import org.slf4j.LoggerFactory; 27 28 import com.stumbleupon.async.Deferred; 29 30 /** 31 * A class that deals with serializing/deserializing appended data point columns. 32 * In busy TSDB installs appends can save on storage at write time and network 33 * bandwidth as TSD compactions are no longer necessary. Each data point is 34 * concatenated to a byte array in storage. At query time, the values are ordered 35 * and de-duped. Optionally the column can be re-written when out of order or 36 * duplicates are detected. 37 * NOTE: This will increase CPU usage on your HBase servers as it has to perform 38 * the atomic read-modify-write operation on the column. 39 * @since 2.2 40 */ 41 public class AppendDataPoints { 42 private static final Logger LOG = LoggerFactory.getLogger(AppendDataPoints.class); 43 44 /** The prefix ID of append columns */ 45 public static final byte APPEND_COLUMN_PREFIX = 0x05; 46 47 /** The full column qualifier for append columns */ 48 public static final byte[] APPEND_COLUMN_QUALIFIER = new byte[] { 49 APPEND_COLUMN_PREFIX, 0x00, 0x00}; 50 51 /** A threshold in seconds where we avoid writing repairs */ 52 public static final int REPAIR_THRESHOLD = 3600; 53 54 /** Filled with the qualifiers in the compacted data points format after parsing */ 55 private byte[] qualifier; 56 57 /** Filled with the values in the compacted data points format after parsing */ 58 private byte[] value; 59 60 /** A deferred that is set if a repaired column was sent to storage */ 61 private Deferred<Object> repaired_deferred = null; 62 63 /** 64 * Default empty ctor 65 */ AppendDataPoints()66 public AppendDataPoints() { 67 68 } 69 70 /** 71 * Creates a new AppendDataPoints object from a qualifier and value. You can 72 * then call {@link #getBytes()} to write to TSDB. 73 * @param qualifier The qualifier with the time offset, type and length flags. 74 * @param value The value to append 75 * @throws IllegalArgumentException if the qualifier or value is null or empty 76 */ AppendDataPoints(final byte[] qualifier, final byte[] value)77 public AppendDataPoints(final byte[] qualifier, final byte[] value) { 78 if (qualifier == null || qualifier.length < 1) { 79 throw new IllegalArgumentException("Qualifier cannot be null or empty"); 80 } 81 if (value == null || value.length < 1) { 82 throw new IllegalArgumentException("Value cannot be null or empty"); 83 } 84 this.qualifier = qualifier; 85 this.value = value; 86 } 87 88 /** 89 * Concatenates the qualifier and value for appending to a column in the 90 * backing data store. 91 * @return A byte array to append to the value of a column. 92 */ getBytes()93 public byte[] getBytes() { 94 final byte[] bytes = new byte[qualifier.length + value.length]; 95 System.arraycopy(this.qualifier, 0, bytes, 0, qualifier.length); 96 System.arraycopy(value, 0, bytes, qualifier.length, value.length); 97 return bytes; 98 } 99 100 /** 101 * Parses a column from storage, orders and drops newer duplicate data points. 102 * The parsing will return both a Cell collection for debugging and add 103 * the cells to concatenated qualifier and value arrays in the compacted data 104 * point format so that the results can be merged with other non-append 105 * columns or rows. 106 * <p> 107 * WARNING: If the "tsd.core.repair_appends" config is set to true then this 108 * method will issue puts against the database, overwriting the column with 109 * sorted and de-duplicated data. It will only do this for rows that are at 110 * least an hour old so as to avoid pounding current rows. 111 * <p> 112 * TODO (CL) - allow for newer or older data points depending on a config. 113 * @param tsdb The TSDB to which we belong 114 * @param kv The key value t parse 115 * @throws IllegalArgumentException if the given KV is not an append column 116 * or we were unable to parse the value. 117 */ parseKeyValue(final TSDB tsdb, final KeyValue kv)118 public final Collection<Cell> parseKeyValue(final TSDB tsdb, final KeyValue kv) { 119 if (kv.qualifier().length != 3 || kv.qualifier()[0] != APPEND_COLUMN_PREFIX) { 120 // it's really not an issue if the offset is not 0, maybe in the future 121 // we'll support appends at different offsets. 122 throw new IllegalArgumentException("Can not parse cell, it is not " + 123 " an appended cell. It has a different qualifier " + 124 Bytes.pretty(kv.qualifier()) + ", row key " + Bytes.pretty(kv.key())); 125 } 126 final boolean repair = tsdb.getConfig().repair_appends(); 127 final long base_time; 128 try { 129 base_time = Internal.baseTime(tsdb, kv.key()); 130 } catch (ArrayIndexOutOfBoundsException oob) { 131 throw new IllegalDataException("Corrupted value: invalid row key: " + kv, 132 oob); 133 } 134 135 int val_idx = 0; 136 int val_length = 0; 137 int qual_length = 0; 138 int last_delta = -1; // Time delta, extracted from the qualifier. 139 140 final Map<Integer, Internal.Cell> deltas = new TreeMap<Integer, Cell>(); 141 boolean has_duplicates = false; 142 boolean out_of_order = false; 143 boolean needs_repair = false; 144 145 try { 146 while (val_idx < kv.value().length) { 147 byte[] q = Internal.extractQualifier(kv.value(), val_idx); 148 System.arraycopy(kv.value(), val_idx, q, 0, q.length); 149 val_idx=val_idx + q.length; 150 151 int vlen = Internal.getValueLengthFromQualifier(q, 0); 152 byte[] v = new byte[vlen]; 153 System.arraycopy(kv.value(), val_idx, v, 0, vlen); 154 val_idx += vlen; 155 int delta = Internal.getOffsetFromQualifier(q); 156 157 final Cell duplicate = deltas.get(delta); 158 if (duplicate != null) { 159 // This is a duplicate cell, skip it 160 has_duplicates = true; 161 qual_length -= duplicate.qualifier.length; 162 val_length -= duplicate.value.length; 163 } 164 165 qual_length += q.length; 166 val_length += vlen; 167 final Cell cell = new Cell(q, v); 168 deltas.put(delta, cell); 169 170 if (!out_of_order) { 171 // Data points needs to be sorted if we find at least one out of 172 // order data 173 if (delta <= last_delta) { 174 out_of_order = true; 175 } 176 last_delta = delta; 177 } 178 } 179 } catch (ArrayIndexOutOfBoundsException oob) { 180 throw new IllegalDataException("Corrupted value: couldn't break down" 181 + " into individual values (consumed " + val_idx + " bytes, but was" 182 + " expecting to consume " + (kv.value().length) + "): " + kv 183 + ", cells so far: " + deltas.values(), oob); 184 } 185 186 if (has_duplicates || out_of_order) { 187 if ((DateTime.currentTimeMillis() / 1000) - base_time > REPAIR_THRESHOLD) { 188 needs_repair = true; 189 } 190 } 191 192 // Check we consumed all the bytes of the value. 193 if (val_idx != kv.value().length) { 194 throw new IllegalDataException("Corrupted value: couldn't break down" 195 + " into individual values (consumed " + val_idx + " bytes, but was" 196 + " expecting to consume " + (kv.value().length) + "): " + kv 197 + ", cells so far: " + deltas.values()); 198 } 199 200 val_idx = 0; 201 int qual_idx = 0; 202 byte[] healed_cell = null; 203 int healed_index = 0; 204 205 this.value = new byte[val_length]; 206 this.qualifier = new byte[qual_length]; 207 208 if (repair && needs_repair) { 209 healed_cell = new byte[val_length+qual_length]; 210 } 211 212 for (final Cell cell: deltas.values()) { 213 System.arraycopy(cell.qualifier, 0, this.qualifier, qual_idx, 214 cell.qualifier.length); 215 qual_idx += cell.qualifier.length; 216 System.arraycopy(cell.value, 0, this.value, val_idx, cell.value.length); 217 val_idx += cell.value.length; 218 219 if (repair && needs_repair) { 220 System.arraycopy(cell.qualifier, 0, healed_cell, healed_index, 221 cell.qualifier.length); 222 healed_index += cell.qualifier.length; 223 System.arraycopy(cell.value, 0, healed_cell, healed_index, cell.value.length); 224 healed_index += cell.value.length; 225 } 226 } 227 228 if (repair && needs_repair) { 229 LOG.debug("Repairing appended data column " + kv); 230 final PutRequest put = new PutRequest(tsdb.table, kv.key(), 231 TSDB.FAMILY(), kv.qualifier(), healed_cell); 232 repaired_deferred = tsdb.getClient().put(put); 233 } 234 235 return deltas.values(); 236 } 237 238 /** @return the sorted qualifier in a compacted data point format after 239 * {@link #parseKeyValue(TSDB, KeyValue)} has been called */ qualifier()240 public byte[] qualifier() { 241 return qualifier; 242 } 243 244 /** @return the sorted value in a compacted data point format after 245 * {@link #parseKeyValue(TSDB, KeyValue)} has been called */ value()246 public byte[] value() { 247 return value; 248 } 249 250 /** @return a deferred to wait on if the call to 251 * {@link #parseKeyValue(TSDB, KeyValue)} triggered a put to storage. */ repairedDeferred()252 public Deferred<Object> repairedDeferred() { 253 return repaired_deferred; 254 } 255 256 /** @return whether or not a qualifier of AppendDataPoints */ isAppendDataPoints(byte[] qualifier)257 public static boolean isAppendDataPoints(byte[] qualifier) { 258 return qualifier != null && qualifier.length == 3 && qualifier[0] == APPEND_COLUMN_PREFIX; 259 } 260 } 261