1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18import { Table } from '../table';
19import { Vector } from '../vector';
20import { IntVector } from '../vector/int';
21import { Field, Schema } from '../schema';
22import { Predicate, Col } from './predicate';
23import { RecordBatch } from '../recordbatch';
24import { VectorType as V } from '../interfaces';
25import { DataType, Int, Struct, Dictionary } from '../type';
26
27/** @ignore */
28export type BindFunc = (batch: RecordBatch) => void;
29/** @ignore */
30export type NextFunc = (idx: number, batch: RecordBatch) => void;
31
32Table.prototype.countBy = function(this: Table, name: Col | string) { return new DataFrame(this.chunks).countBy(name); };
33Table.prototype.scan = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scan(next, bind); };
34Table.prototype.scanReverse = function(this: Table, next: NextFunc, bind?: BindFunc) { return new DataFrame(this.chunks).scanReverse(next, bind); };
35Table.prototype.filter = function(this: Table, predicate: Predicate): FilteredDataFrame { return new DataFrame(this.chunks).filter(predicate); };
36
37export class DataFrame<T extends { [key: string]: DataType } = any> extends Table<T> {
38    public filter(predicate: Predicate): FilteredDataFrame<T> {
39        return new FilteredDataFrame<T>(this.chunks, predicate);
40    }
41    public scan(next: NextFunc, bind?: BindFunc) {
42        const batches = this.chunks, numBatches = batches.length;
43        for (let batchIndex = -1; ++batchIndex < numBatches;) {
44            // load batches
45            const batch = batches[batchIndex];
46            if (bind) { bind(batch); }
47            // yield all indices
48            for (let index = -1, numRows = batch.length; ++index < numRows;) {
49                next(index, batch);
50            }
51        }
52    }
53    public scanReverse(next: NextFunc, bind?: BindFunc) {
54        const batches = this.chunks, numBatches = batches.length;
55        for (let batchIndex = numBatches; --batchIndex >= 0;) {
56            // load batches
57            const batch = batches[batchIndex];
58            if (bind) { bind(batch); }
59            // yield all indices
60            for (let index = batch.length; --index >= 0;) {
61                next(index, batch);
62            }
63        }
64    }
65    public countBy(name: Col | string) {
66        const batches = this.chunks, numBatches = batches.length;
67        const count_by = typeof name === 'string' ? new Col(name) : name as Col;
68        // Assume that all dictionary batches are deltas, which means that the
69        // last record batch has the most complete dictionary
70        count_by.bind(batches[numBatches - 1]);
71        const vector = count_by.vector as V<Dictionary>;
72        if (!DataType.isDictionary(vector.type)) {
73            throw new Error('countBy currently only supports dictionary-encoded columns');
74        }
75
76        const countByteLength = Math.ceil(Math.log(vector.length) / Math.log(256));
77        const CountsArrayType = countByteLength == 4 ? Uint32Array :
78                                countByteLength >= 2 ? Uint16Array : Uint8Array;
79
80        const counts = new CountsArrayType(vector.dictionary.length);
81        for (let batchIndex = -1; ++batchIndex < numBatches;) {
82            // load batches
83            const batch = batches[batchIndex];
84            // rebind the countBy Col
85            count_by.bind(batch);
86            const keys = (count_by.vector as V<Dictionary>).indices;
87            // yield all indices
88            for (let index = -1, numRows = batch.length; ++index < numRows;) {
89                let key = keys.get(index);
90                if (key !== null) { counts[key]++; }
91            }
92        }
93        return new CountByResult(vector.dictionary, IntVector.from(counts));
94    }
95}
96
97/** @ignore */
98export class CountByResult<T extends DataType = any, TCount extends Int = Int> extends Table<{ values: T,  counts: TCount }> {
99    constructor(values: Vector<T>, counts: V<TCount>) {
100        type R = { values: T, counts: TCount };
101        const schema = new Schema<R>([
102            new Field('values', values.type),
103            new Field('counts', counts.type)
104        ]);
105        super(new RecordBatch<R>(schema, counts.length, [values, counts]));
106    }
107    public toJSON(): Object {
108        const values = this.getColumnAt(0)!;
109        const counts = this.getColumnAt(1)!;
110        const result = {} as { [k: string]: number | null };
111        for (let i = -1; ++i < this.length;) {
112            result[values.get(i)] = counts.get(i);
113        }
114        return result;
115    }
116}
117
118/** @ignore */
119export class FilteredDataFrame<T extends { [key: string]: DataType } = any> extends DataFrame<T> {
120    private _predicate: Predicate;
121    constructor (batches: RecordBatch<T>[], predicate: Predicate) {
122        super(batches);
123        this._predicate = predicate;
124    }
125    public scan(next: NextFunc, bind?: BindFunc) {
126        // inlined version of this:
127        // this.parent.scan((idx, columns) => {
128        //     if (this.predicate(idx, columns)) next(idx, columns);
129        // });
130        const batches = this._chunks;
131        const numBatches = batches.length;
132        for (let batchIndex = -1; ++batchIndex < numBatches;) {
133            // load batches
134            const batch = batches[batchIndex];
135            const predicate = this._predicate.bind(batch);
136            let isBound = false;
137            // yield all indices
138            for (let index = -1, numRows = batch.length; ++index < numRows;) {
139                if (predicate(index, batch)) {
140                    // bind batches lazily - if predicate doesn't match anything
141                    // in the batch we don't need to call bind on the batch
142                    if (bind && !isBound) {
143                        bind(batch);
144                        isBound = true;
145                    }
146                    next(index, batch);
147                }
148            }
149        }
150    }
151    public scanReverse(next: NextFunc, bind?: BindFunc) {
152        const batches = this._chunks;
153        const numBatches = batches.length;
154        for (let batchIndex = numBatches; --batchIndex >= 0;) {
155            // load batches
156            const batch = batches[batchIndex];
157            const predicate = this._predicate.bind(batch);
158            let isBound = false;
159            // yield all indices
160            for (let index = batch.length; --index >= 0;) {
161                if (predicate(index, batch)) {
162                    // bind batches lazily - if predicate doesn't match anything
163                    // in the batch we don't need to call bind on the batch
164                    if (bind && !isBound) {
165                        bind(batch);
166                        isBound = true;
167                    }
168                    next(index, batch);
169                }
170            }
171        }
172    }
173    public count(): number {
174        // inlined version of this:
175        // let sum = 0;
176        // this.parent.scan((idx, columns) => {
177        //     if (this.predicate(idx, columns)) ++sum;
178        // });
179        // return sum;
180        let sum = 0;
181        const batches = this._chunks;
182        const numBatches = batches.length;
183        for (let batchIndex = -1; ++batchIndex < numBatches;) {
184            // load batches
185            const batch = batches[batchIndex];
186            const predicate = this._predicate.bind(batch);
187            // yield all indices
188            for (let index = -1, numRows = batch.length; ++index < numRows;) {
189                if (predicate(index, batch)) { ++sum; }
190            }
191        }
192        return sum;
193    }
194    public *[Symbol.iterator](): IterableIterator<Struct<T>['TValue']> {
195        // inlined version of this:
196        // this.parent.scan((idx, columns) => {
197        //     if (this.predicate(idx, columns)) next(idx, columns);
198        // });
199        const batches = this._chunks;
200        const numBatches = batches.length;
201        for (let batchIndex = -1; ++batchIndex < numBatches;) {
202            // load batches
203            const batch = batches[batchIndex];
204            // TODO: bind batches lazily
205            // If predicate doesn't match anything in the batch we don't need
206            // to bind the callback
207            const predicate = this._predicate.bind(batch);
208            // yield all indices
209            for (let index = -1, numRows = batch.length; ++index < numRows;) {
210                if (predicate(index, batch)) { yield batch.get(index) as any; }
211            }
212        }
213    }
214    public filter(predicate: Predicate): FilteredDataFrame<T> {
215        return new FilteredDataFrame<T>(
216            this._chunks,
217            this._predicate.and(predicate)
218        );
219    }
220    public countBy(name: Col | string) {
221        const batches = this._chunks, numBatches = batches.length;
222        const count_by = typeof name === 'string' ? new Col(name) : name as Col;
223        // Assume that all dictionary batches are deltas, which means that the
224        // last record batch has the most complete dictionary
225        count_by.bind(batches[numBatches - 1]);
226        const vector = count_by.vector as V<Dictionary>;
227        if (!DataType.isDictionary(vector.type)) {
228            throw new Error('countBy currently only supports dictionary-encoded columns');
229        }
230
231        const countByteLength = Math.ceil(Math.log(vector.length) / Math.log(256));
232        const CountsArrayType = countByteLength == 4 ? Uint32Array :
233                                countByteLength >= 2 ? Uint16Array : Uint8Array;
234
235        const counts = new CountsArrayType(vector.dictionary.length);
236
237        for (let batchIndex = -1; ++batchIndex < numBatches;) {
238            // load batches
239            const batch = batches[batchIndex];
240            const predicate = this._predicate.bind(batch);
241            // rebind the countBy Col
242            count_by.bind(batch);
243            const keys = (count_by.vector as V<Dictionary>).indices;
244            // yield all indices
245            for (let index = -1, numRows = batch.length; ++index < numRows;) {
246                let key = keys.get(index);
247                if (key !== null && predicate(index, batch)) { counts[key]++; }
248            }
249        }
250        return new CountByResult(vector.dictionary, IntVector.from(counts));
251    }
252}
253