1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, 12// software distributed under the License is distributed on an 13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 14// KIND, either express or implied. See the License for the 15// specific language governing permissions and limitations 16// under the License. 17 18import { Data } from './data'; 19import { Table } from './table'; 20import { Vector } from './vector'; 21import { Visitor } from './visitor'; 22import { Schema, Field } from './schema'; 23import { isIterable } from './util/compat'; 24import { Chunked } from './vector/chunked'; 25import { selectFieldArgs } from './util/args'; 26import { DataType, Struct, Dictionary } from './type'; 27import { ensureSameLengthData } from './util/recordbatch'; 28import { Clonable, Sliceable, Applicative } from './vector'; 29import { StructVector, VectorBuilderOptions, VectorBuilderOptionsAsync } from './vector/index'; 30 31type VectorMap = { [key: string]: Vector }; 32type Fields<T extends { [key: string]: DataType }> = (keyof T)[] | Field<T[keyof T]>[]; 33type ChildData<T extends { [key: string]: DataType }> = (Data<T[keyof T]> | Vector<T[keyof T]>)[]; 34 35export interface RecordBatch<T extends { [key: string]: DataType } = any> { 36 concat(...others: Vector<Struct<T>>[]): Table<T>; 37 slice(begin?: number, end?: number): RecordBatch<T>; 38 clone(data: Data<Struct<T>>, children?: Vector[]): RecordBatch<T>; 39} 40 41export class RecordBatch<T extends { [key: string]: DataType } = any> 42 extends StructVector<T> 43 implements Clonable<RecordBatch<T>>, 44 Sliceable<RecordBatch<T>>, 45 Applicative<Struct<T>, Table<T>> { 46 47 public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptions<Struct<T>, TNull>): Table<T>; 48 public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptionsAsync<Struct<T>, TNull>): Promise<Table<T>>; 49 /** @nocollapse */ 50 public static from<T extends { [key: string]: DataType } = any, TNull = any>(options: VectorBuilderOptions<Struct<T>, TNull> | VectorBuilderOptionsAsync<Struct<T>, TNull>) { 51 if (isIterable<(Struct<T>)['TValue'] | TNull>(options['values'])) { 52 return Table.from(options as VectorBuilderOptions<Struct<T>, TNull>); 53 } 54 return Table.from(options as VectorBuilderOptionsAsync<Struct<T>, TNull>); 55 } 56 57 public static new<T extends VectorMap = any>(children: T): RecordBatch<{ [P in keyof T]: T[P]['type'] }>; 58 public static new<T extends { [key: string]: DataType } = any>(children: ChildData<T>, fields?: Fields<T>): RecordBatch<T>; 59 /** @nocollapse */ 60 public static new<T extends { [key: string]: DataType } = any>(...args: any[]) { 61 const [fs, xs] = selectFieldArgs<T>(args); 62 const vs = xs.filter((x): x is Vector<T[keyof T]> => x instanceof Vector); 63 return new RecordBatch(...ensureSameLengthData(new Schema<T>(fs), vs.map((x) => x.data))); 64 } 65 66 protected _schema: Schema; 67 protected _dictionaries?: Map<number, Vector>; 68 69 constructor(schema: Schema<T>, length: number, children: (Data | Vector)[]); 70 constructor(schema: Schema<T>, data: Data<Struct<T>>, children?: Vector[]); 71 constructor(...args: any[]) { 72 let data: Data<Struct<T>>; 73 let schema = args[0] as Schema<T>; 74 let children: Vector[] | undefined; 75 if (args[1] instanceof Data) { 76 [, data, children] = (args as [any, Data<Struct<T>>, Vector<T[keyof T]>[]?]); 77 } else { 78 const fields = schema.fields as Field<T[keyof T]>[]; 79 const [, length, childData] = args as [any, number, Data<T[keyof T]>[]]; 80 data = Data.Struct(new Struct<T>(fields), 0, length, 0, null, childData); 81 } 82 super(data, children); 83 this._schema = schema; 84 } 85 86 public clone(data: Data<Struct<T>>, children = this._children) { 87 return new RecordBatch<T>(this._schema, data, children); 88 } 89 90 public concat(...others: Vector<Struct<T>>[]): Table<T> { 91 const schema = this._schema, chunks = Chunked.flatten(this, ...others); 92 return new Table(schema, chunks.map(({ data }) => new RecordBatch(schema, data))); 93 } 94 95 public get schema() { return this._schema; } 96 public get numCols() { return this._schema.fields.length; } 97 public get dictionaries() { 98 return this._dictionaries || (this._dictionaries = DictionaryCollector.collect(this)); 99 } 100 101 public select<K extends keyof T = any>(...columnNames: K[]) { 102 const nameToIndex = this._schema.fields.reduce((m, f, i) => m.set(f.name as K, i), new Map<K, number>()); 103 return this.selectAt(...columnNames.map((columnName) => nameToIndex.get(columnName)!).filter((x) => x > -1)); 104 } 105 public selectAt<K extends T[keyof T] = any>(...columnIndices: number[]) { 106 const schema = this._schema.selectAt(...columnIndices); 107 const childData = columnIndices.map((i) => this.data.childData[i]).filter(Boolean); 108 return new RecordBatch<{ [key: string]: K }>(schema, this.length, childData); 109 } 110} 111 112/** 113 * An internal class used by the `RecordBatchReader` and `RecordBatchWriter` 114 * implementations to differentiate between a stream with valid zero-length 115 * RecordBatches, and a stream with a Schema message, but no RecordBatches. 116 * @see https://github.com/apache/arrow/pull/4373 117 * @ignore 118 * @private 119 */ 120/* tslint:disable:class-name */ 121export class _InternalEmptyPlaceholderRecordBatch<T extends { [key: string]: DataType } = any> extends RecordBatch<T> { 122 constructor(schema: Schema<T>) { 123 super(schema, 0, schema.fields.map((f) => Data.new(f.type, 0, 0, 0))); 124 } 125} 126 127/** @ignore */ 128class DictionaryCollector extends Visitor { 129 public dictionaries = new Map<number, Vector>(); 130 public static collect<T extends RecordBatch>(batch: T) { 131 return new DictionaryCollector().visit( 132 batch.data, new Struct(batch.schema.fields) 133 ).dictionaries; 134 } 135 public visit(data: Data, type: DataType) { 136 if (DataType.isDictionary(type)) { 137 return this.visitDictionary(data, type); 138 } else { 139 data.childData.forEach((child, i) => 140 this.visit(child, type.children[i].type)); 141 } 142 return this; 143 } 144 public visitDictionary(data: Data, type: Dictionary) { 145 const dictionary = data.dictionary; 146 if (dictionary && dictionary.length > 0) { 147 this.dictionaries.set(type.id, dictionary); 148 } 149 return this; 150 } 151} 152