1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, 12// software distributed under the License is distributed on an 13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14// KIND, either express or implied. See the License for the 15// specific language governing permissions and limitations 16// under the License. 17 18import { Table } from '../table'; 19import { Vector } from '../vector'; 20import { IntVector } from '../vector/int'; 21import { Field, Schema } from '../schema'; 22import { Predicate, Col } from './predicate'; 23import { RecordBatch } from '../recordbatch'; 24import { VectorType as V } from '../interfaces'; 25import { DataType, Int, Struct, Dictionary } from '../type'; 26 27/** @ignore */ 28export type BindFunc = (batch: RecordBatch) => void; 29/** @ignore */ 30export type NextFunc = (idx: number, batch: RecordBatch) => void; 31 32Table.prototype.countBy = function(this: Table, name: Col | string) { return new DataFrame(this.chunks).countBy(name); }; 33Table.prototype.scan = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scan(next, bind); }; 34Table.prototype.scanReverse = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scanReverse(next, bind); }; 35Table.prototype.filter = function(this: Table, predicate: Predicate): FilteredDataFrame { return new DataFrame(this.chunks).filter(predicate); }; 36 37export class DataFrame<T extends { [key: string]: DataType } = any> extends Table<T> { 38 public filter(predicate: Predicate): FilteredDataFrame<T> { 39 return new FilteredDataFrame<T>(this.chunks, predicate); 40 } 41 public scan(next: NextFunc, bind?: BindFunc) { 42 const batches = this.chunks, numBatches = batches.length; 43 for (let batchIndex = -1; ++batchIndex < numBatches;) { 44 // load batches 45 const batch = batches[batchIndex]; 46 if (bind) { bind(batch); } 47 // yield all indices 48 for (let index = -1, numRows = batch.length; ++index < numRows;) { 49 next(index, batch); 50 } 51 } 52 } 53 public scanReverse(next: NextFunc, bind?: BindFunc) { 54 const batches = this.chunks, numBatches = batches.length; 55 for (let batchIndex = numBatches; --batchIndex >= 0;) { 56 // load batches 57 const batch = batches[batchIndex]; 58 if (bind) { bind(batch); } 59 // yield all indices 60 for (let index = batch.length; --index >= 0;) { 61 next(index, batch); 62 } 63 } 64 } 65 public countBy(name: Col | string) { 66 const batches = this.chunks, numBatches = batches.length; 67 const count_by = typeof name === 'string' ? new Col(name) : name as Col; 68 // Assume that all dictionary batches are deltas, which means that the 69 // last record batch has the most complete dictionary 70 count_by.bind(batches[numBatches - 1]); 71 const vector = count_by.vector as V<Dictionary>; 72 if (!DataType.isDictionary(vector.type)) { 73 throw new Error('countBy currently only supports dictionary-encoded columns'); 74 } 75 76 const countByteLength = Math.ceil(Math.log(vector.length) / Math.log(256)); 77 const CountsArrayType = countByteLength == 4 ? Uint32Array : 78 countByteLength >= 2 ? Uint16Array : Uint8Array; 79 80 const counts = new CountsArrayType(vector.dictionary.length); 81 for (let batchIndex = -1; ++batchIndex < numBatches;) { 82 // load batches 83 const batch = batches[batchIndex]; 84 // rebind the countBy Col 85 count_by.bind(batch); 86 const keys = (count_by.vector as V<Dictionary>).indices; 87 // yield all indices 88 for (let index = -1, numRows = batch.length; ++index < numRows;) { 89 let key = keys.get(index); 90 if (key !== null) { counts[key]++; } 91 } 92 } 93 return new CountByResult(vector.dictionary, IntVector.from(counts)); 94 } 95} 96 97/** @ignore */ 98export class CountByResult<T extends DataType = any, TCount extends Int = Int> extends Table<{ values: T, counts: TCount }> { 99 constructor(values: Vector<T>, counts: V<TCount>) { 100 type R = { values: T, counts: TCount }; 101 const schema = new Schema<R>([ 102 new Field('values', values.type), 103 new Field('counts', counts.type) 104 ]); 105 super(new RecordBatch<R>(schema, counts.length, [values, counts])); 106 } 107 public toJSON(): Object { 108 const values = this.getColumnAt(0)!; 109 const counts = this.getColumnAt(1)!; 110 const result = {} as { [k: string]: number | null }; 111 for (let i = -1; ++i < this.length;) { 112 result[values.get(i)] = counts.get(i); 113 } 114 return result; 115 } 116} 117 118/** @ignore */ 119export class FilteredDataFrame<T extends { [key: string]: DataType } = any> extends DataFrame<T> { 120 private _predicate: Predicate; 121 constructor (batches: RecordBatch<T>[], predicate: Predicate) { 122 super(batches); 123 this._predicate = predicate; 124 } 125 public scan(next: NextFunc, bind?: BindFunc) { 126 // inlined version of this: 127 // this.parent.scan((idx, columns) => { 128 // if (this.predicate(idx, columns)) next(idx, columns); 129 // }); 130 const batches = this._chunks; 131 const numBatches = batches.length; 132 for (let batchIndex = -1; ++batchIndex < numBatches;) { 133 // load batches 134 const batch = batches[batchIndex]; 135 const predicate = this._predicate.bind(batch); 136 let isBound = false; 137 // yield all indices 138 for (let index = -1, numRows = batch.length; ++index < numRows;) { 139 if (predicate(index, batch)) { 140 // bind batches lazily - if predicate doesn't match anything 141 // in the batch we don't need to call bind on the batch 142 if (bind && !isBound) { 143 bind(batch); 144 isBound = true; 145 } 146 next(index, batch); 147 } 148 } 149 } 150 } 151 public scanReverse(next: NextFunc, bind?: BindFunc) { 152 const batches = this._chunks; 153 const numBatches = batches.length; 154 for (let batchIndex = numBatches; --batchIndex >= 0;) { 155 // load batches 156 const batch = batches[batchIndex]; 157 const predicate = this._predicate.bind(batch); 158 let isBound = false; 159 // yield all indices 160 for (let index = batch.length; --index >= 0;) { 161 if (predicate(index, batch)) { 162 // bind batches lazily - if predicate doesn't match anything 163 // in the batch we don't need to call bind on the batch 164 if (bind && !isBound) { 165 bind(batch); 166 isBound = true; 167 } 168 next(index, batch); 169 } 170 } 171 } 172 } 173 public count(): number { 174 // inlined version of this: 175 // let sum = 0; 176 // this.parent.scan((idx, columns) => { 177 // if (this.predicate(idx, columns)) ++sum; 178 // }); 179 // return sum; 180 let sum = 0; 181 const batches = this._chunks; 182 const numBatches = batches.length; 183 for (let batchIndex = -1; ++batchIndex < numBatches;) { 184 // load batches 185 const batch = batches[batchIndex]; 186 const predicate = this._predicate.bind(batch); 187 // yield all indices 188 for (let index = -1, numRows = batch.length; ++index < numRows;) { 189 if (predicate(index, batch)) { ++sum; } 190 } 191 } 192 return sum; 193 } 194 public *[Symbol.iterator](): IterableIterator<Struct<T>['TValue']> { 195 // inlined version of this: 196 // this.parent.scan((idx, columns) => { 197 // if (this.predicate(idx, columns)) next(idx, columns); 198 // }); 199 const batches = this._chunks; 200 const numBatches = batches.length; 201 for (let batchIndex = -1; ++batchIndex < numBatches;) { 202 // load batches 203 const batch = batches[batchIndex]; 204 // TODO: bind batches lazily 205 // If predicate doesn't match anything in the batch we don't need 206 // to bind the callback 207 const predicate = this._predicate.bind(batch); 208 // yield all indices 209 for (let index = -1, numRows = batch.length; ++index < numRows;) { 210 if (predicate(index, batch)) { yield batch.get(index) as any; } 211 } 212 } 213 } 214 public filter(predicate: Predicate): FilteredDataFrame<T> { 215 return new FilteredDataFrame<T>( 216 this._chunks, 217 this._predicate.and(predicate) 218 ); 219 } 220 public countBy(name: Col | string) { 221 const batches = this._chunks, numBatches = batches.length; 222 const count_by = typeof name === 'string' ? new Col(name) : name as Col; 223 // Assume that all dictionary batches are deltas, which means that the 224 // last record batch has the most complete dictionary 225 count_by.bind(batches[numBatches - 1]); 226 const vector = count_by.vector as V<Dictionary>; 227 if (!DataType.isDictionary(vector.type)) { 228 throw new Error('countBy currently only supports dictionary-encoded columns'); 229 } 230 231 const countByteLength = Math.ceil(Math.log(vector.length) / Math.log(256)); 232 const CountsArrayType = countByteLength == 4 ? Uint32Array : 233 countByteLength >= 2 ? Uint16Array : Uint8Array; 234 235 const counts = new CountsArrayType(vector.dictionary.length); 236 237 for (let batchIndex = -1; ++batchIndex < numBatches;) { 238 // load batches 239 const batch = batches[batchIndex]; 240 const predicate = this._predicate.bind(batch); 241 // rebind the countBy Col 242 count_by.bind(batch); 243 const keys = (count_by.vector as V<Dictionary>).indices; 244 // yield all indices 245 for (let index = -1, numRows = batch.length; ++index < numRows;) { 246 let key = keys.get(index); 247 if (key !== null && predicate(index, batch)) { counts[key]++; } 248 } 249 } 250 return new CountByResult(vector.dictionary, IntVector.from(counts)); 251 } 252} 253