1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18import { Data } from './data';
19import { Table } from './table';
20import { Vector } from './vector';
21import { Visitor } from './visitor';
22import { Schema, Field } from './schema';
23import { isIterable } from './util/compat';
24import { Chunked } from './vector/chunked';
25import { selectFieldArgs } from './util/args';
26import { DataType, Struct, Dictionary } from './type';
27import { ensureSameLengthData } from './util/recordbatch';
28import { Clonable, Sliceable, Applicative } from './vector';
29import { StructVector, VectorBuilderOptions, VectorBuilderOptionsAsync } from './vector/index';
30
31type VectorMap = { [key: string]: Vector };
32type Fields<T extends { [key: string]: DataType }> = (keyof T)[] | Field<T[keyof T]>[];
33type ChildData<T extends { [key: string]: DataType }> = (Data<T[keyof T]> | Vector<T[keyof T]>)[];
34
35export interface RecordBatch<T extends { [key: string]: DataType } = any> {
36    concat(...others: Vector<Struct<T>>[]): Table<T>;
37    slice(begin?: number, end?: number): RecordBatch<T>;
38    clone(data: Data<Struct<T>>, children?: Vector[]): RecordBatch<T>;
39}
40
41export class RecordBatch<T extends { [key: string]: DataType } = any>
42    extends StructVector<T>
43    implements Clonable<RecordBatch<T>>,
44               Sliceable<RecordBatch<T>>,
45               Applicative<Struct<T>, Table<T>> {
46
47    public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptions<Struct<T>, TNull>): Table<T>;
48    public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptionsAsync<Struct<T>, TNull>): Promise<Table<T>>;
49    /** @nocollapse */
50    public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptions<Struct<T>, TNull> | VectorBuilderOptionsAsync<Struct<T>, TNull>) {
51        if (isIterable<(Struct<T>)['TValue'] | TNull>(options['values'])) {
52            return Table.from(options as VectorBuilderOptions<Struct<T>, TNull>);
53        }
54        return Table.from(options as VectorBuilderOptionsAsync<Struct<T>, TNull>);
55    }
56
57    public static new<T extends VectorMap = any>(children: T): RecordBatch<{ [P in keyof T]: T[P]['type'] }>;
58    public static new<T extends { [key: string]: DataType } = any>(children: ChildData<T>, fields?: Fields<T>): RecordBatch<T>;
59    /** @nocollapse */
60    public static new<T extends { [key: string]: DataType } = any>(...args: any[]) {
61        const [fs, xs] = selectFieldArgs<T>(args);
62        const vs = xs.filter((x): x is Vector<T[keyof T]> => x instanceof Vector);
63        return new RecordBatch(...ensureSameLengthData(new Schema<T>(fs), vs.map((x) => x.data)));
64    }
65
66    protected _schema: Schema;
67    protected _dictionaries?: Map<number, Vector>;
68
69    constructor(schema: Schema<T>, length: number, children: (Data | Vector)[]);
70    constructor(schema: Schema<T>, data: Data<Struct<T>>, children?: Vector[]);
71    constructor(...args: any[]) {
72        let data: Data<Struct<T>>;
73        let schema = args[0] as Schema<T>;
74        let children: Vector[] | undefined;
75        if (args[1] instanceof Data) {
76            [, data, children] = (args as [any, Data<Struct<T>>, Vector<T[keyof T]>[]?]);
77        } else {
78            const fields = schema.fields as Field<T[keyof T]>[];
79            const [, length, childData] = args as [any, number, Data<T[keyof T]>[]];
80            data = Data.Struct(new Struct<T>(fields), 0, length, 0, null, childData);
81        }
82        super(data, children);
83        this._schema = schema;
84    }
85
86    public clone(data: Data<Struct<T>>, children = this._children) {
87        return new RecordBatch<T>(this._schema, data, children);
88    }
89
90    public concat(...others: Vector<Struct<T>>[]): Table<T> {
91        const schema = this._schema, chunks = Chunked.flatten(this, ...others);
92        return new Table(schema, chunks.map(({ data }) => new RecordBatch(schema, data)));
93    }
94
95    public get schema() { return this._schema; }
96    public get numCols() { return this._schema.fields.length; }
97    public get dictionaries() {
98        return this._dictionaries || (this._dictionaries = DictionaryCollector.collect(this));
99    }
100
101    public select<K extends keyof T = any>(...columnNames: K[]) {
102        const nameToIndex = this._schema.fields.reduce((m, f, i) => m.set(f.name as K, i), new Map<K, number>());
103        return this.selectAt(...columnNames.map((columnName) => nameToIndex.get(columnName)!).filter((x) => x > -1));
104    }
105    public selectAt<K extends T[keyof T] = any>(...columnIndices: number[]) {
106        const schema = this._schema.selectAt(...columnIndices);
107        const childData = columnIndices.map((i) => this.data.childData[i]).filter(Boolean);
108        return new RecordBatch<{ [key: string]: K }>(schema, this.length, childData);
109    }
110}
111
112/**
113 * An internal class used by the `RecordBatchReader` and `RecordBatchWriter`
114 * implementations to differentiate between a stream with valid zero-length
115 * RecordBatches, and a stream with a Schema message, but no RecordBatches.
116 * @see https://github.com/apache/arrow/pull/4373
117 * @ignore
118 * @private
119 */
120/* tslint:disable:class-name */
121export class _InternalEmptyPlaceholderRecordBatch<T extends { [key: string]: DataType } = any> extends RecordBatch<T> {
122    constructor(schema: Schema<T>) {
123        super(schema, 0, schema.fields.map((f) => Data.new(f.type, 0, 0, 0)));
124    }
125}
126
127/** @ignore */
128class DictionaryCollector extends Visitor {
129    public dictionaries = new Map<number, Vector>();
130    public static collect<T extends RecordBatch>(batch: T) {
131        return new DictionaryCollector().visit(
132            batch.data, new Struct(batch.schema.fields)
133        ).dictionaries;
134    }
135    public visit(data: Data, type: DataType) {
136        if (DataType.isDictionary(type)) {
137            return this.visitDictionary(data, type);
138        } else {
139            data.childData.forEach((child, i) =>
140                this.visit(child, type.children[i].type));
141        }
142        return this;
143    }
144    public visitDictionary(data: Data, type: Dictionary) {
145        const dictionary = data.dictionary;
146        if (dictionary && dictionary.length > 0) {
147            this.dictionaries.set(type.id, dictionary);
148        }
149        return this;
150    }
151}
152