1import { Field, DataFrame, FieldType, Labels, QueryResultMeta } from '../types';
2import { ArrayVector } from '../vector';
3import { DataFrameJSON, decodeFieldValueEntities, FieldSchema } from './DataFrameJSON';
4import { guessFieldTypeFromValue } from './processDataFrame';
5import { join } from '../transformations/transformers/joinDataFrames';
6import { AlignedData } from 'uplot';
7
8/**
9 * Indicate if the frame is appened or replace
10 *
11 * @public -- but runtime
12 */
13export enum StreamingFrameAction {
14  Append = 'append',
15  Replace = 'replace',
16}
17
18/**
19 * Stream packet info is attached to StreamingDataFrames and indicate how many
20 * rows were added to the end of the frame.  The number of discarded rows can be
21 * calculated from previous state
22 *
23 * @public -- but runtime
24 */
25export interface StreamPacketInfo {
26  number: number;
27  action: StreamingFrameAction;
28  length: number;
29}
30
31/**
32 * @alpha
33 */
34export interface StreamingFrameOptions {
35  maxLength?: number; // 1000
36  maxDelta?: number; // how long to keep things
37  action?: StreamingFrameAction; // default will append
38}
39
40enum PushMode {
41  wide,
42  labels,
43  // long
44}
45
46/**
47 * Unlike a circular buffer, this will append and periodically slice the front
48 *
49 * @alpha
50 */
51export class StreamingDataFrame implements DataFrame {
52  name?: string;
53  refId?: string;
54  meta: QueryResultMeta = {};
55
56  fields: Array<Field<any, ArrayVector<any>>> = [];
57  length = 0;
58
59  options: StreamingFrameOptions;
60
61  private schemaFields: FieldSchema[] = [];
62  private timeFieldIndex = -1;
63  private pushMode = PushMode.wide;
64  private alwaysReplace = false;
65
66  // current labels
67  private labels: Set<string> = new Set();
68  readonly packetInfo: StreamPacketInfo = {
69    number: 0,
70    action: StreamingFrameAction.Replace,
71    length: 0,
72  };
73
74  constructor(frame: DataFrameJSON, opts?: StreamingFrameOptions) {
75    this.options = {
76      maxLength: 1000,
77      maxDelta: Infinity,
78      ...opts,
79    };
80    this.alwaysReplace = this.options.action === StreamingFrameAction.Replace;
81
82    this.push(frame);
83  }
84
85  /**
86   * apply the new message to the existing data.  This will replace the existing schema
87   * if a new schema is included in the message, or append data matching the current schema
88   */
89  push(msg: DataFrameJSON) {
90    const { schema, data } = msg;
91
92    this.packetInfo.number++;
93
94    if (schema) {
95      this.pushMode = PushMode.wide;
96      this.timeFieldIndex = schema.fields.findIndex((f) => f.type === FieldType.time);
97      if (
98        this.timeFieldIndex === 1 &&
99        schema.fields[0].name === 'labels' &&
100        schema.fields[0].type === FieldType.string
101      ) {
102        this.pushMode = PushMode.labels;
103        this.timeFieldIndex = 0; // after labels are removed!
104      }
105
106      const niceSchemaFields = this.pushMode === PushMode.labels ? schema.fields.slice(1) : schema.fields;
107
108      this.refId = schema.refId;
109      if (schema.meta) {
110        this.meta = { ...schema.meta };
111      }
112
113      if (hasSameStructure(this.schemaFields, niceSchemaFields)) {
114        const len = niceSchemaFields.length;
115        this.fields.forEach((f, idx) => {
116          const sf = niceSchemaFields[idx % len];
117          f.config = sf.config ?? {};
118          f.labels = sf.labels;
119        });
120      } else {
121        const isWide = this.pushMode === PushMode.wide;
122        this.fields = niceSchemaFields.map((f) => {
123          return {
124            config: f.config ?? {},
125            name: f.name,
126            labels: f.labels,
127            type: f.type ?? FieldType.other,
128            // transfer old values by type & name, unless we relied on labels to match fields
129            values: isWide
130              ? this.fields.find((of) => of.name === f.name && f.type === of.type)?.values ?? new ArrayVector()
131              : new ArrayVector(),
132          };
133        });
134      }
135
136      this.schemaFields = niceSchemaFields;
137    }
138
139    if (data && data.values.length && data.values[0].length) {
140      let { values, entities } = data;
141
142      if (entities) {
143        entities.forEach((ents, i) => {
144          if (ents) {
145            decodeFieldValueEntities(ents, values[i]);
146            // TODO: append replacements to field
147          }
148        });
149      }
150
151      if (this.pushMode === PushMode.labels) {
152        // augment and transform data to match current schema for standard circPush() path
153        const labeledTables = transpose(values);
154
155        // make sure fields are initalized for each label
156        for (const label of labeledTables.keys()) {
157          if (!this.labels.has(label)) {
158            this.addLabel(label);
159          }
160        }
161
162        // TODO: cache higher up
163        let dummyTable = Array(this.schemaFields.length).fill([]);
164
165        let tables: AlignedData[] = [];
166        this.labels.forEach((label) => {
167          tables.push(labeledTables.get(label) ?? dummyTable);
168        });
169
170        values = join(tables);
171      }
172
173      if (values.length !== this.fields.length) {
174        if (this.fields.length) {
175          throw new Error(
176            `push message mismatch.  Expected: ${this.fields.length}, recieved: ${values.length} (labels=${
177              this.pushMode === PushMode.labels
178            })`
179          );
180        }
181
182        this.fields = values.map((vals, idx) => {
183          let name = `Field ${idx}`;
184          let type = guessFieldTypeFromValue(vals[0]);
185          const isTime = idx === 0 && type === FieldType.number && vals[0] > 1600016688632;
186          if (isTime) {
187            type = FieldType.time;
188            name = 'Time';
189          }
190
191          return {
192            name,
193            type,
194            config: {},
195            values: new ArrayVector([]),
196          };
197        });
198      }
199
200      let appended = values;
201      this.packetInfo.length = values[0].length;
202
203      if (this.alwaysReplace || !this.length) {
204        this.packetInfo.action = StreamingFrameAction.Replace;
205      } else {
206        this.packetInfo.action = StreamingFrameAction.Append;
207
208        // mutates appended
209        appended = this.fields.map((f) => f.values.buffer);
210        circPush(appended, values, this.options.maxLength, this.timeFieldIndex, this.options.maxDelta);
211      }
212
213      appended.forEach((v, i) => {
214        const { state, values } = this.fields[i];
215        values.buffer = v;
216        if (state) {
217          state.calcs = undefined;
218        }
219      });
220
221      // Update the frame length
222      this.length = appended[0].length;
223    }
224  }
225
226  // adds a set of fields for a new label
227  private addLabel(label: string) {
228    let labelCount = this.labels.size;
229
230    // parse labels
231    const parsedLabels: Labels = {};
232    if (label.length) {
233      label.split(',').forEach((kv) => {
234        const [key, val] = kv.trim().split('=');
235        parsedLabels[key] = val;
236      });
237    }
238
239    if (labelCount === 0) {
240      // mutate existing fields and add labels
241      this.fields.forEach((f, i) => {
242        if (i > 0) {
243          f.labels = parsedLabels;
244        }
245      });
246    } else {
247      for (let i = 1; i < this.schemaFields.length; i++) {
248        let proto = this.schemaFields[i] as Field;
249
250        this.fields.push({
251          ...proto,
252          config: proto.config ?? {},
253          labels: parsedLabels,
254          values: new ArrayVector(Array(this.length).fill(undefined)),
255        });
256      }
257    }
258
259    this.labels.add(label);
260  }
261}
262
263// converts vertical insertion records with table keys in [0] and column values in [1...N]
264// to join()-able tables with column arrays
265export function transpose(vrecs: any[][]) {
266  let tableKeys = new Set(vrecs[0]);
267  let tables = new Map();
268
269  tableKeys.forEach((key) => {
270    let cols = Array(vrecs.length - 1)
271      .fill(null)
272      .map(() => []);
273
274    tables.set(key, cols);
275  });
276
277  for (let r = 0; r < vrecs[0].length; r++) {
278    let table = tables.get(vrecs[0][r]);
279    for (let c = 1; c < vrecs.length; c++) {
280      table[c - 1].push(vrecs[c][r]);
281    }
282  }
283
284  return tables;
285}
286
287// binary search for index of closest value
288function closestIdx(num: number, arr: number[], lo?: number, hi?: number) {
289  let mid;
290  lo = lo || 0;
291  hi = hi || arr.length - 1;
292  let bitwise = hi <= 2147483647;
293
294  while (hi - lo > 1) {
295    mid = bitwise ? (lo + hi) >> 1 : Math.floor((lo + hi) / 2);
296
297    if (arr[mid] < num) {
298      lo = mid;
299    } else {
300      hi = mid;
301    }
302  }
303
304  if (num - arr[lo] <= arr[hi] - num) {
305    return lo;
306  }
307
308  return hi;
309}
310
311/**
312 * @internal // not exported in yet
313 */
314export function getLastStreamingDataFramePacket(frame: DataFrame) {
315  const pi = (frame as StreamingDataFrame).packetInfo;
316  return pi?.action ? pi : undefined;
317}
318
319// mutable circular push
320function circPush(data: number[][], newData: number[][], maxLength = Infinity, deltaIdx = 0, maxDelta = Infinity) {
321  for (let i = 0; i < data.length; i++) {
322    data[i] = data[i].concat(newData[i]);
323  }
324
325  const nlen = data[0].length;
326
327  let sliceIdx = 0;
328
329  if (nlen > maxLength) {
330    sliceIdx = nlen - maxLength;
331  }
332
333  if (maxDelta !== Infinity && deltaIdx >= 0) {
334    const deltaLookup = data[deltaIdx];
335
336    const low = deltaLookup[sliceIdx];
337    const high = deltaLookup[nlen - 1];
338
339    if (high - low > maxDelta) {
340      sliceIdx = closestIdx(high - maxDelta, deltaLookup, sliceIdx);
341    }
342  }
343
344  if (sliceIdx) {
345    for (let i = 0; i < data.length; i++) {
346      data[i] = data[i].slice(sliceIdx);
347    }
348  }
349
350  return sliceIdx;
351}
352
353function hasSameStructure(a: FieldSchema[], b: FieldSchema[]): boolean {
354  if (a?.length !== b.length) {
355    return false;
356  }
357  for (let i = 0; i < a.length; i++) {
358    const fA = a[i];
359    const fB = b[i];
360    if (fA.name !== fB.name || fA.type !== fB.type) {
361      return false;
362    }
363  }
364  return true;
365}
366