1import { Field, DataFrame, FieldType, Labels, QueryResultMeta } from '../types'; 2import { ArrayVector } from '../vector'; 3import { DataFrameJSON, decodeFieldValueEntities, FieldSchema } from './DataFrameJSON'; 4import { guessFieldTypeFromValue } from './processDataFrame'; 5import { join } from '../transformations/transformers/joinDataFrames'; 6import { AlignedData } from 'uplot'; 7 8/** 9 * Indicate if the frame is appened or replace 10 * 11 * @public -- but runtime 12 */ 13export enum StreamingFrameAction { 14 Append = 'append', 15 Replace = 'replace', 16} 17 18/** 19 * Stream packet info is attached to StreamingDataFrames and indicate how many 20 * rows were added to the end of the frame. The number of discarded rows can be 21 * calculated from previous state 22 * 23 * @public -- but runtime 24 */ 25export interface StreamPacketInfo { 26 number: number; 27 action: StreamingFrameAction; 28 length: number; 29} 30 31/** 32 * @alpha 33 */ 34export interface StreamingFrameOptions { 35 maxLength?: number; // 1000 36 maxDelta?: number; // how long to keep things 37 action?: StreamingFrameAction; // default will append 38} 39 40enum PushMode { 41 wide, 42 labels, 43 // long 44} 45 46/** 47 * Unlike a circular buffer, this will append and periodically slice the front 48 * 49 * @alpha 50 */ 51export class StreamingDataFrame implements DataFrame { 52 name?: string; 53 refId?: string; 54 meta: QueryResultMeta = {}; 55 56 fields: Array<Field<any, ArrayVector<any>>> = []; 57 length = 0; 58 59 options: StreamingFrameOptions; 60 61 private schemaFields: FieldSchema[] = []; 62 private timeFieldIndex = -1; 63 private pushMode = PushMode.wide; 64 private alwaysReplace = false; 65 66 // current labels 67 private labels: Set<string> = new Set(); 68 readonly packetInfo: StreamPacketInfo = { 69 number: 0, 70 action: StreamingFrameAction.Replace, 71 length: 0, 72 }; 73 74 constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) { 75 this.options = { 76 maxLength: 1000, 77 maxDelta: Infinity, 78 ...opts, 79 }; 80 this.alwaysReplace = this.options.action === StreamingFrameAction.Replace; 81 82 this.push(frame); 83 } 84 85 /** 86 * apply the new message to the existing data. This will replace the existing schema 87 * if a new schema is included in the message, or append data matching the current schema 88 */ 89 push(msg: DataFrameJSON) { 90 const { schema, data } = msg; 91 92 this.packetInfo.number++; 93 94 if (schema) { 95 this.pushMode = PushMode.wide; 96 this.timeFieldIndex = schema.fields.findIndex((f) => f.type === FieldType.time); 97 if ( 98 this.timeFieldIndex === 1 && 99 schema.fields[0].name === 'labels' && 100 schema.fields[0].type === FieldType.string 101 ) { 102 this.pushMode = PushMode.labels; 103 this.timeFieldIndex = 0; // after labels are removed! 104 } 105 106 const niceSchemaFields = this.pushMode === PushMode.labels ? schema.fields.slice(1) : schema.fields; 107 108 this.refId = schema.refId; 109 if (schema.meta) { 110 this.meta = { ...schema.meta }; 111 } 112 113 if (hasSameStructure(this.schemaFields, niceSchemaFields)) { 114 const len = niceSchemaFields.length; 115 this.fields.forEach((f, idx) => { 116 const sf = niceSchemaFields[idx % len]; 117 f.config = sf.config ?? {}; 118 f.labels = sf.labels; 119 }); 120 } else { 121 const isWide = this.pushMode === PushMode.wide; 122 this.fields = niceSchemaFields.map((f) => { 123 return { 124 config: f.config ?? {}, 125 name: f.name, 126 labels: f.labels, 127 type: f.type ?? FieldType.other, 128 // transfer old values by type & name, unless we relied on labels to match fields 129 values: isWide 130 ? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector() 131 : new ArrayVector(), 132 }; 133 }); 134 } 135 136 this.schemaFields = niceSchemaFields; 137 } 138 139 if (data && data.values.length && data.values[0].length) { 140 let { values, entities } = data; 141 142 if (entities) { 143 entities.forEach((ents, i) => { 144 if (ents) { 145 decodeFieldValueEntities(ents, values[i]); 146 // TODO: append replacements to field 147 } 148 }); 149 } 150 151 if (this.pushMode === PushMode.labels) { 152 // augment and transform data to match current schema for standard circPush() path 153 const labeledTables = transpose(values); 154 155 // make sure fields are initalized for each label 156 for (const label of labeledTables.keys()) { 157 if (!this.labels.has(label)) { 158 this.addLabel(label); 159 } 160 } 161 162 // TODO: cache higher up 163 let dummyTable = Array(this.schemaFields.length).fill([]); 164 165 let tables: AlignedData[] = []; 166 this.labels.forEach((label) => { 167 tables.push(labeledTables.get(label) ?? dummyTable); 168 }); 169 170 values = join(tables); 171 } 172 173 if (values.length !== this.fields.length) { 174 if (this.fields.length) { 175 throw new Error( 176 `push message mismatch. Expected: ${this.fields.length}, recieved: ${values.length} (labels=${ 177 this.pushMode === PushMode.labels 178 })` 179 ); 180 } 181 182 this.fields = values.map((vals, idx) => { 183 let name = `Field ${idx}`; 184 let type = guessFieldTypeFromValue(vals[0]); 185 const isTime = idx === 0 && type === FieldType.number && vals[0] > 1600016688632; 186 if (isTime) { 187 type = FieldType.time; 188 name = 'Time'; 189 } 190 191 return { 192 name, 193 type, 194 config: {}, 195 values: new ArrayVector([]), 196 }; 197 }); 198 } 199 200 let appended = values; 201 this.packetInfo.length = values[0].length; 202 203 if (this.alwaysReplace || !this.length) { 204 this.packetInfo.action = StreamingFrameAction.Replace; 205 } else { 206 this.packetInfo.action = StreamingFrameAction.Append; 207 208 // mutates appended 209 appended = this.fields.map((f) => f.values.buffer); 210 circPush(appended, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta); 211 } 212 213 appended.forEach((v, i) => { 214 const { state, values } = this.fields[i]; 215 values.buffer = v; 216 if (state) { 217 state.calcs = undefined; 218 } 219 }); 220 221 // Update the frame length 222 this.length = appended[0].length; 223 } 224 } 225 226 // adds a set of fields for a new label 227 private addLabel(label: string) { 228 let labelCount = this.labels.size; 229 230 // parse labels 231 const parsedLabels: Labels = {}; 232 if (label.length) { 233 label.split(',').forEach((kv) => { 234 const [key, val] = kv.trim().split('='); 235 parsedLabels[key] = val; 236 }); 237 } 238 239 if (labelCount === 0) { 240 // mutate existing fields and add labels 241 this.fields.forEach((f, i) => { 242 if (i > 0) { 243 f.labels = parsedLabels; 244 } 245 }); 246 } else { 247 for (let i = 1; i < this.schemaFields.length; i++) { 248 let proto = this.schemaFields[i] as Field; 249 250 this.fields.push({ 251 ...proto, 252 config: proto.config ?? {}, 253 labels: parsedLabels, 254 values: new ArrayVector(Array(this.length).fill(undefined)), 255 }); 256 } 257 } 258 259 this.labels.add(label); 260 } 261} 262 263// converts vertical insertion records with table keys in [0] and column values in [1...N] 264// to join()-able tables with column arrays 265export function transpose(vrecs: any[][]) { 266 let tableKeys = new Set(vrecs[0]); 267 let tables = new Map(); 268 269 tableKeys.forEach((key) => { 270 let cols = Array(vrecs.length - 1) 271 .fill(null) 272 .map(() => []); 273 274 tables.set(key, cols); 275 }); 276 277 for (let r = 0; r < vrecs[0].length; r++) { 278 let table = tables.get(vrecs[0][r]); 279 for (let c = 1; c < vrecs.length; c++) { 280 table[c - 1].push(vrecs[c][r]); 281 } 282 } 283 284 return tables; 285} 286 287// binary search for index of closest value 288function closestIdx(num: number, arr: number[], lo?: number, hi?: number) { 289 let mid; 290 lo = lo || 0; 291 hi = hi || arr.length - 1; 292 let bitwise = hi <= 2147483647; 293 294 while (hi - lo > 1) { 295 mid = bitwise ? (lo + hi) >> 1 : Math.floor((lo + hi) / 2); 296 297 if (arr[mid] < num) { 298 lo = mid; 299 } else { 300 hi = mid; 301 } 302 } 303 304 if (num - arr[lo] <= arr[hi] - num) { 305 return lo; 306 } 307 308 return hi; 309} 310 311/** 312 * @internal // not exported in yet 313 */ 314export function getLastStreamingDataFramePacket(frame: DataFrame) { 315 const pi = (frame as StreamingDataFrame).packetInfo; 316 return pi?.action ? pi : undefined; 317} 318 319// mutable circular push 320function circPush(data: number[][], newData: number[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) { 321 for (let i = 0; i < data.length; i++) { 322 data[i] = data[i].concat(newData[i]); 323 } 324 325 const nlen = data[0].length; 326 327 let sliceIdx = 0; 328 329 if (nlen > maxLength) { 330 sliceIdx = nlen - maxLength; 331 } 332 333 if (maxDelta !== Infinity && deltaIdx >= 0) { 334 const deltaLookup = data[deltaIdx]; 335 336 const low = deltaLookup[sliceIdx]; 337 const high = deltaLookup[nlen - 1]; 338 339 if (high - low > maxDelta) { 340 sliceIdx = closestIdx(high - maxDelta, deltaLookup, sliceIdx); 341 } 342 } 343 344 if (sliceIdx) { 345 for (let i = 0; i < data.length; i++) { 346 data[i] = data[i].slice(sliceIdx); 347 } 348 } 349 350 return sliceIdx; 351} 352 353function hasSameStructure(a: FieldSchema[], b: FieldSchema[]): boolean { 354 if (a?.length !== b.length) { 355 return false; 356 } 357 for (let i = 0; i < a.length; i++) { 358 const fA = a[i]; 359 const fB = b[i]; 360 if (fA.name !== fB.name || fA.type !== fB.type) { 361 return false; 362 } 363 } 364 return true; 365} 366