1#! /usr/bin/env node
2
3// Licensed to the Apache Software Foundation (ASF) under one
4// or more contributor license agreements.  See the NOTICE file
5// distributed with this work for additional information
6// regarding copyright ownership.  The ASF licenses this file
7// to you under the Apache License, Version 2.0 (the
8// "License"); you may not use this file except in compliance
9// with the License.  You may obtain a copy of the License at
10//
11//   http://www.apache.org/licenses/LICENSE-2.0
12//
13// Unless required by applicable law or agreed to in writing,
14// software distributed under the License is distributed on an
15// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16// KIND, either express or implied.  See the License for the
17// specific language governing permissions and limitations
18// under the License.
19
20/* tslint:disable */
21
22import * as fs from 'fs';
23import * as stream from 'stream';
24import { valueToString } from '../util/pretty';
25import { Schema, RecordBatch, RecordBatchReader, AsyncByteQueue } from '../Arrow.node';
26
27const padLeft = require('pad-left');
28const bignumJSONParse = require('json-bignum').parse;
29const argv = require(`command-line-args`)(cliOpts(), { partial: true });
30const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean);
31
32const state = { ...argv, closed: false, maxColWidths: [10] };
33
34type ToStringState = {
35    hr: string;
36    sep: string;
37    schema: any;
38    closed: boolean;
39    metadata: boolean;
40    maxColWidths: number[];
41};
42
43(async () => {
44
45    const sources = argv.help ? [] : [
46        ...files.map((file) => () => fs.createReadStream(file)),
47        ...(process.stdin.isTTY ? [] : [() => process.stdin])
48    ].filter(Boolean) as (() => NodeJS.ReadableStream)[];
49
50    let reader: RecordBatchReader | null;
51    let hasReaders = false;
52
53    for (const source of sources) {
54        if (state.closed) { break; }
55        for await (reader of recordBatchReaders(source)) {
56            hasReaders = true;
57            const transformToString = batchesToString(state, reader.schema);
58            await pipeTo(
59                reader.pipe(transformToString),
60                process.stdout, { end: false }
61            ).catch(() => state.closed = true); // Handle EPIPE errors
62        }
63        if (state.closed) { break; }
64    }
65
66    return hasReaders ? 0 : print_usage();
67})()
68.then((x) => +x || 0, (err) => {
69    if (err) {
70        console.error(`${err && err.stack || err}`);
71    }
72    return process.exitCode || 1;
73}).then((code) => process.exit(code));
74
75function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) {
76    return new Promise((resolve, reject) => {
77
78        source.on('end', onEnd).pipe(sink, opts).on('error', onErr);
79
80        function onEnd() { done(undefined, resolve); }
81        function onErr(err:any) { done(err, reject); }
82        function done(e: any, cb: (e?: any) => void) {
83            source.removeListener('end', onEnd);
84            sink.removeListener('error', onErr);
85            cb(e);
86        }
87    });
88}
89
90async function *recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream) {
91
92    let json = new AsyncByteQueue();
93    let stream = new AsyncByteQueue();
94    let source = createSourceStream();
95    let reader: RecordBatchReader | null = null;
96    let readers: AsyncIterable<RecordBatchReader> | null = null;
97    // tee the input source, just in case it's JSON
98    source.on('end', () => [stream, json].forEach((y) => y.close()))
99        .on('data', (x) => [stream, json].forEach((y) => y.write(x)))
100       .on('error', (e) => [stream, json].forEach((y) => y.abort(e)));
101
102    try {
103        for await (reader of RecordBatchReader.readAll(stream)) {
104            reader && (yield reader);
105        }
106        if (reader) return;
107    } catch (e) { readers = null; }
108
109    if (!readers) {
110        await json.closed;
111        if (source instanceof fs.ReadStream) { source.close(); }
112        // If the data in the `json` ByteQueue parses to JSON, then assume it's Arrow JSON from a file or stdin
113        try {
114            for await (reader of RecordBatchReader.readAll(bignumJSONParse(await json.toString()))) {
115                reader && (yield reader);
116            }
117        } catch (e) { readers = null; }
118    }
119}
120
121function batchesToString(state: ToStringState, schema: Schema) {
122
123    let rowId = 0;
124    let batchId = -1;
125    let maxColWidths = [10];
126    const { hr, sep } = state;
127
128    const header = ['row_id', ...schema.fields.map((f) => `${f}`)].map(valueToString);
129
130    state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length));
131
132    return new stream.Transform({
133        encoding: 'utf8',
134        writableObjectMode: true,
135        readableObjectMode: false,
136        final(cb: (error?: Error | null) => void) {
137            // if there were no batches, then print the Schema, and metadata
138            if (batchId === -1) {
139                hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
140                this.push(`${formatRow(header, maxColWidths, sep)}\n`);
141                if (state.metadata && schema.metadata.size > 0) {
142                    this.push(`metadata:\n${formatMetadata(schema.metadata)}\n`);
143                }
144            }
145            hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`);
146            cb();
147        },
148        transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) {
149
150            batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema);
151
152            if (state.closed) { return cb(undefined, null); }
153
154            // Pass one to convert to strings and count max column widths
155            state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)));
156
157            // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and
158            if (++batchId === 0) {
159                hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
160                if (state.metadata && batch.schema.metadata.size > 0) {
161                    this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`);
162                    hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`);
163                }
164                if (batch.length <= 0 || batch.numCols <= 0) {
165                    this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`);
166                }
167            }
168
169            if (batch.length > 0 && batch.numCols > 0) {
170                // If any of the column widths changed, print the header again
171                if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) {
172                    this.push(`${formatRow(header, state.maxColWidths, sep)}\n`);
173                }
174                maxColWidths = state.maxColWidths;
175                for (const row of batch) {
176                    if (state.closed) { break; } else if (!row) { continue; }
177                    if (rowId++ % 350 === 0) {
178                        this.push(`${formatRow(header, maxColWidths, sep)}\n`);
179                    }
180                    this.push(`${formatRow([rowId, ...row.toArray()].map(valueToString), maxColWidths, sep)}\n`);
181                }
182            }
183            cb();
184        }
185    });
186}
187
188function horizontalRule(maxColWidths: number[], hr = '', sep = ' | ') {
189    return ` ${padLeft('', maxColWidths.reduce((x, y) => x + y, -2 + maxColWidths.length * sep.length), hr)}`;
190}
191
192function formatRow(row: string[] = [], maxColWidths: number[] = [], sep = ' | ') {
193    return `${row.map((x, j) => padLeft(x, maxColWidths[j])).join(sep)}`;
194}
195
196function formatMetadata(metadata: Map<string, string>) {
197
198    return [...metadata].map(([key, val]) =>
199        `  ${key}: ${formatMetadataValue(val)}`
200    ).join(',  \n');
201
202    function formatMetadataValue(value: string = '') {
203        let parsed = value;
204        try {
205            parsed = JSON.stringify(JSON.parse(value), null, 2);
206        } catch (e) { parsed = value; }
207        return valueToString(parsed).split('\n').join('\n  ');
208    }
209}
210
211function measureColumnWidths(rowId: number, batch: RecordBatch, maxColWidths: number[] = []) {
212    let val: any, j = 0;
213    for (const row of batch) {
214        if (!row) { continue; }
215        maxColWidths[j = 0] = Math.max(maxColWidths[0] || 0, (`${rowId++}`).length);
216        for (val of row) {
217            if (val && typedArrayElementWidths.has(val.constructor) && (typeof val[Symbol.toPrimitive] !== 'function')) {
218                // If we're printing a column of TypedArrays, ensure the column is wide enough to accommodate
219                // the widest possible element for a given byte size, since JS omits leading zeroes. For example:
220                // 1 |  [1137743649,2170567488,244696391,2122556476]
221                // 2 |                                          null
222                // 3 |   [637174007,2142281880,961736230,2912449282]
223                // 4 |    [1035112265,21832886,412842672,2207710517]
224                // 5 |                                          null
225                // 6 |                                          null
226                // 7 |     [2755142991,4192423256,2994359,467878370]
227                const elementWidth = typedArrayElementWidths.get(val.constructor)!;
228
229                maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0,
230                    2 + // brackets on each end
231                    (val.length - 1) + // commas between elements
232                    (val.length * elementWidth) // width of stringified 2^N-1
233                );
234            } else {
235                maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, valueToString(val).length);
236            }
237            ++j;
238        }
239    }
240    return maxColWidths;
241}
242
243// Measure the stringified representation of 2^N-1 for each TypedArray variant
244const typedArrayElementWidths = (() => {
245    const maxElementWidth = (ArrayType: any) => {
246        const octets = Array.from({ length: ArrayType.BYTES_PER_ELEMENT - 1 }, _ => 255);
247        return `${new ArrayType(new Uint8Array([...octets, 254]).buffer)[0]}`.length;
248    };
249    return new Map<any, number>([
250        [Int8Array, maxElementWidth(Int8Array)],
251        [Int16Array, maxElementWidth(Int16Array)],
252        [Int32Array, maxElementWidth(Int32Array)],
253        [Uint8Array, maxElementWidth(Uint8Array)],
254        [Uint16Array, maxElementWidth(Uint16Array)],
255        [Uint32Array, maxElementWidth(Uint32Array)],
256        [Float32Array, maxElementWidth(Float32Array)],
257        [Float64Array, maxElementWidth(Float64Array)],
258        [Uint8ClampedArray, maxElementWidth(Uint8ClampedArray)]
259    ])
260})();
261
262function cliOpts() {
263    return [
264        {
265            type: String,
266            name: 'schema', alias: 's',
267            optional: true, multiple: true,
268            typeLabel: '{underline columns}',
269            description: 'A space-delimited list of column names'
270        },
271        {
272            type: String,
273            name: 'file', alias: 'f',
274            optional: true, multiple: true,
275            description: 'The Arrow file to read'
276        },
277        {
278            type: String,
279            name: 'sep', optional: true, default: ' | ',
280            description: 'The column separator character (default: " | ")'
281        },
282        {
283            type: String,
284            name: 'hr', optional: true, default: '',
285            description: 'The horizontal border character (default: "")'
286        },
287        {
288            type: Boolean,
289            name: 'metadata', alias: 'm',
290            optional: true, default: false,
291            description: 'Flag to print Schema metadata (default: false)'
292        },
293        {
294            type: Boolean,
295            name: 'help', optional: true, default: false,
296            description: 'Print this usage guide.'
297        }
298    ];
299}
300
301function print_usage() {
302    console.log(require('command-line-usage')([
303        {
304            header: 'arrow2csv',
305            content: 'Print a CSV from an Arrow file'
306        },
307        {
308            header: 'Synopsis',
309            content: [
310                '$ arrow2csv {underline file.arrow} [{bold --schema} column_name ...]',
311                '$ arrow2csv [{bold --schema} column_name ...] [{bold --file} {underline file.arrow}]',
312                '$ arrow2csv {bold -s} column_1 {bold -s} column_2 [{bold -f} {underline file.arrow}]',
313                '$ arrow2csv [{bold --help}]'
314            ]
315        },
316        {
317            header: 'Options',
318            optionList: cliOpts()
319        },
320        {
321            header: 'Example',
322            content: [
323                '$ arrow2csv --schema foo baz --sep " , " -f simple.arrow',
324                '>   "row_id", "foo: Int32", "baz: Utf8"',
325                '>          0,            1,        "aa"',
326                '>          1,         null,        null',
327                '>          2,            3,        null',
328                '>          3,            4,       "bbb"',
329                '>          4,            5,      "cccc"',
330            ]
331        }
332    ]));
333    return 1;
334}
335