1#! /usr/bin/env node 2 3// Licensed to the Apache Software Foundation (ASF) under one 4// or more contributor license agreements. See the NOTICE file 5// distributed with this work for additional information 6// regarding copyright ownership. The ASF licenses this file 7// to you under the Apache License, Version 2.0 (the 8// "License"); you may not use this file except in compliance 9// with the License. You may obtain a copy of the License at 10// 11// http://www.apache.org/licenses/LICENSE-2.0 12// 13// Unless required by applicable law or agreed to in writing, 14// software distributed under the License is distributed on an 15// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 16// KIND, either express or implied. See the License for the 17// specific language governing permissions and limitations 18// under the License. 19 20/* tslint:disable */ 21 22import * as fs from 'fs'; 23import * as stream from 'stream'; 24import { valueToString } from '../util/pretty'; 25import { Schema, RecordBatch, RecordBatchReader, AsyncByteQueue } from '../Arrow.node'; 26 27const padLeft = require('pad-left'); 28const bignumJSONParse = require('json-bignum').parse; 29const argv = require(`command-line-args`)(cliOpts(), { partial: true }); 30const files = argv.help ? [] : [...(argv.file || []), ...(argv._unknown || [])].filter(Boolean); 31 32const state = { ...argv, closed: false, maxColWidths: [10] }; 33 34type ToStringState = { 35 hr: string; 36 sep: string; 37 schema: any; 38 closed: boolean; 39 metadata: boolean; 40 maxColWidths: number[]; 41}; 42 43(async () => { 44 45 const sources = argv.help ? [] : [ 46 ...files.map((file) => () => fs.createReadStream(file)), 47 ...(process.stdin.isTTY ? [] : [() => process.stdin]) 48 ].filter(Boolean) as (() => NodeJS.ReadableStream)[]; 49 50 let reader: RecordBatchReader | null; 51 let hasReaders = false; 52 53 for (const source of sources) { 54 if (state.closed) { break; } 55 for await (reader of recordBatchReaders(source)) { 56 hasReaders = true; 57 const transformToString = batchesToString(state, reader.schema); 58 await pipeTo( 59 reader.pipe(transformToString), 60 process.stdout, { end: false } 61 ).catch(() => state.closed = true); // Handle EPIPE errors 62 } 63 if (state.closed) { break; } 64 } 65 66 return hasReaders ? 0 : print_usage(); 67})() 68.then((x) => +x || 0, (err) => { 69 if (err) { 70 console.error(`${err && err.stack || err}`); 71 } 72 return process.exitCode || 1; 73}).then((code) => process.exit(code)); 74 75function pipeTo(source: NodeJS.ReadableStream, sink: NodeJS.WritableStream, opts?: { end: boolean }) { 76 return new Promise((resolve, reject) => { 77 78 source.on('end', onEnd).pipe(sink, opts).on('error', onErr); 79 80 function onEnd() { done(undefined, resolve); } 81 function onErr(err:any) { done(err, reject); } 82 function done(e: any, cb: (e?: any) => void) { 83 source.removeListener('end', onEnd); 84 sink.removeListener('error', onErr); 85 cb(e); 86 } 87 }); 88} 89 90async function *recordBatchReaders(createSourceStream: () => NodeJS.ReadableStream) { 91 92 let json = new AsyncByteQueue(); 93 let stream = new AsyncByteQueue(); 94 let source = createSourceStream(); 95 let reader: RecordBatchReader | null = null; 96 let readers: AsyncIterable<RecordBatchReader> | null = null; 97 // tee the input source, just in case it's JSON 98 source.on('end', () => [stream, json].forEach((y) => y.close())) 99 .on('data', (x) => [stream, json].forEach((y) => y.write(x))) 100 .on('error', (e) => [stream, json].forEach((y) => y.abort(e))); 101 102 try { 103 for await (reader of RecordBatchReader.readAll(stream)) { 104 reader && (yield reader); 105 } 106 if (reader) return; 107 } catch (e) { readers = null; } 108 109 if (!readers) { 110 await json.closed; 111 if (source instanceof fs.ReadStream) { source.close(); } 112 // If the data in the `json` ByteQueue parses to JSON, then assume it's Arrow JSON from a file or stdin 113 try { 114 for await (reader of RecordBatchReader.readAll(bignumJSONParse(await json.toString()))) { 115 reader && (yield reader); 116 } 117 } catch (e) { readers = null; } 118 } 119} 120 121function batchesToString(state: ToStringState, schema: Schema) { 122 123 let rowId = 0; 124 let batchId = -1; 125 let maxColWidths = [10]; 126 const { hr, sep } = state; 127 128 const header = ['row_id', ...schema.fields.map((f) => `${f}`)].map(valueToString); 129 130 state.maxColWidths = header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length)); 131 132 return new stream.Transform({ 133 encoding: 'utf8', 134 writableObjectMode: true, 135 readableObjectMode: false, 136 final(cb: (error?: Error | null) => void) { 137 // if there were no batches, then print the Schema, and metadata 138 if (batchId === -1) { 139 hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); 140 this.push(`${formatRow(header, maxColWidths, sep)}\n`); 141 if (state.metadata && schema.metadata.size > 0) { 142 this.push(`metadata:\n${formatMetadata(schema.metadata)}\n`); 143 } 144 } 145 hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n\n`); 146 cb(); 147 }, 148 transform(batch: RecordBatch, _enc: string, cb: (error?: Error, data?: any) => void) { 149 150 batch = !(state.schema && state.schema.length) ? batch : batch.select(...state.schema); 151 152 if (state.closed) { return cb(undefined, null); } 153 154 // Pass one to convert to strings and count max column widths 155 state.maxColWidths = measureColumnWidths(rowId, batch, header.map((x, i) => Math.max(maxColWidths[i] || 0, x.length))); 156 157 // If this is the first batch in a stream, print a top horizontal rule, schema metadata, and 158 if (++batchId === 0) { 159 hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); 160 if (state.metadata && batch.schema.metadata.size > 0) { 161 this.push(`metadata:\n${formatMetadata(batch.schema.metadata)}\n`); 162 hr && this.push(`${horizontalRule(state.maxColWidths, hr, sep)}\n`); 163 } 164 if (batch.length <= 0 || batch.numCols <= 0) { 165 this.push(`${formatRow(header, maxColWidths = state.maxColWidths, sep)}\n`); 166 } 167 } 168 169 if (batch.length > 0 && batch.numCols > 0) { 170 // If any of the column widths changed, print the header again 171 if (rowId % 350 !== 0 && JSON.stringify(state.maxColWidths) !== JSON.stringify(maxColWidths)) { 172 this.push(`${formatRow(header, state.maxColWidths, sep)}\n`); 173 } 174 maxColWidths = state.maxColWidths; 175 for (const row of batch) { 176 if (state.closed) { break; } else if (!row) { continue; } 177 if (rowId++ % 350 === 0) { 178 this.push(`${formatRow(header, maxColWidths, sep)}\n`); 179 } 180 this.push(`${formatRow([rowId, ...row.toArray()].map(valueToString), maxColWidths, sep)}\n`); 181 } 182 } 183 cb(); 184 } 185 }); 186} 187 188function horizontalRule(maxColWidths: number[], hr = '', sep = ' | ') { 189 return ` ${padLeft('', maxColWidths.reduce((x, y) => x + y, -2 + maxColWidths.length * sep.length), hr)}`; 190} 191 192function formatRow(row: string[] = [], maxColWidths: number[] = [], sep = ' | ') { 193 return `${row.map((x, j) => padLeft(x, maxColWidths[j])).join(sep)}`; 194} 195 196function formatMetadata(metadata: Map<string, string>) { 197 198 return [...metadata].map(([key, val]) => 199 ` ${key}: ${formatMetadataValue(val)}` 200 ).join(', \n'); 201 202 function formatMetadataValue(value: string = '') { 203 let parsed = value; 204 try { 205 parsed = JSON.stringify(JSON.parse(value), null, 2); 206 } catch (e) { parsed = value; } 207 return valueToString(parsed).split('\n').join('\n '); 208 } 209} 210 211function measureColumnWidths(rowId: number, batch: RecordBatch, maxColWidths: number[] = []) { 212 let val: any, j = 0; 213 for (const row of batch) { 214 if (!row) { continue; } 215 maxColWidths[j = 0] = Math.max(maxColWidths[0] || 0, (`${rowId++}`).length); 216 for (val of row) { 217 if (val && typedArrayElementWidths.has(val.constructor) && (typeof val[Symbol.toPrimitive] !== 'function')) { 218 // If we're printing a column of TypedArrays, ensure the column is wide enough to accommodate 219 // the widest possible element for a given byte size, since JS omits leading zeroes. For example: 220 // 1 | [1137743649,2170567488,244696391,2122556476] 221 // 2 | null 222 // 3 | [637174007,2142281880,961736230,2912449282] 223 // 4 | [1035112265,21832886,412842672,2207710517] 224 // 5 | null 225 // 6 | null 226 // 7 | [2755142991,4192423256,2994359,467878370] 227 const elementWidth = typedArrayElementWidths.get(val.constructor)!; 228 229 maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, 230 2 + // brackets on each end 231 (val.length - 1) + // commas between elements 232 (val.length * elementWidth) // width of stringified 2^N-1 233 ); 234 } else { 235 maxColWidths[j + 1] = Math.max(maxColWidths[j + 1] || 0, valueToString(val).length); 236 } 237 ++j; 238 } 239 } 240 return maxColWidths; 241} 242 243// Measure the stringified representation of 2^N-1 for each TypedArray variant 244const typedArrayElementWidths = (() => { 245 const maxElementWidth = (ArrayType: any) => { 246 const octets = Array.from({ length: ArrayType.BYTES_PER_ELEMENT - 1 }, _ => 255); 247 return `${new ArrayType(new Uint8Array([...octets, 254]).buffer)[0]}`.length; 248 }; 249 return new Map<any, number>([ 250 [Int8Array, maxElementWidth(Int8Array)], 251 [Int16Array, maxElementWidth(Int16Array)], 252 [Int32Array, maxElementWidth(Int32Array)], 253 [Uint8Array, maxElementWidth(Uint8Array)], 254 [Uint16Array, maxElementWidth(Uint16Array)], 255 [Uint32Array, maxElementWidth(Uint32Array)], 256 [Float32Array, maxElementWidth(Float32Array)], 257 [Float64Array, maxElementWidth(Float64Array)], 258 [Uint8ClampedArray, maxElementWidth(Uint8ClampedArray)] 259 ]) 260})(); 261 262function cliOpts() { 263 return [ 264 { 265 type: String, 266 name: 'schema', alias: 's', 267 optional: true, multiple: true, 268 typeLabel: '{underline columns}', 269 description: 'A space-delimited list of column names' 270 }, 271 { 272 type: String, 273 name: 'file', alias: 'f', 274 optional: true, multiple: true, 275 description: 'The Arrow file to read' 276 }, 277 { 278 type: String, 279 name: 'sep', optional: true, default: ' | ', 280 description: 'The column separator character (default: " | ")' 281 }, 282 { 283 type: String, 284 name: 'hr', optional: true, default: '', 285 description: 'The horizontal border character (default: "")' 286 }, 287 { 288 type: Boolean, 289 name: 'metadata', alias: 'm', 290 optional: true, default: false, 291 description: 'Flag to print Schema metadata (default: false)' 292 }, 293 { 294 type: Boolean, 295 name: 'help', optional: true, default: false, 296 description: 'Print this usage guide.' 297 } 298 ]; 299} 300 301function print_usage() { 302 console.log(require('command-line-usage')([ 303 { 304 header: 'arrow2csv', 305 content: 'Print a CSV from an Arrow file' 306 }, 307 { 308 header: 'Synopsis', 309 content: [ 310 '$ arrow2csv {underline file.arrow} [{bold --schema} column_name ...]', 311 '$ arrow2csv [{bold --schema} column_name ...] [{bold --file} {underline file.arrow}]', 312 '$ arrow2csv {bold -s} column_1 {bold -s} column_2 [{bold -f} {underline file.arrow}]', 313 '$ arrow2csv [{bold --help}]' 314 ] 315 }, 316 { 317 header: 'Options', 318 optionList: cliOpts() 319 }, 320 { 321 header: 'Example', 322 content: [ 323 '$ arrow2csv --schema foo baz --sep " , " -f simple.arrow', 324 '> "row_id", "foo: Int32", "baz: Utf8"', 325 '> 0, 1, "aa"', 326 '> 1, null, null', 327 '> 2, 3, null', 328 '> 3, 4, "bbb"', 329 '> 4, 5, "cccc"', 330 ] 331 } 332 ])); 333 return 1; 334} 335