1package data
2
3import (
4	"encoding/json"
5	"errors"
6	"fmt"
7	"io"
8	"time"
9
10	"github.com/apache/arrow/go/arrow"
11	"github.com/apache/arrow/go/arrow/array"
12	"github.com/apache/arrow/go/arrow/arrio"
13	"github.com/apache/arrow/go/arrow/ipc"
14	"github.com/apache/arrow/go/arrow/memory"
15	"github.com/mattetti/filebuffer"
16)
17
18// MarshalArrow converts the Frame to an arrow table and returns a byte
19// representation of that table.
20// All fields of a Frame must be of the same length or an error is returned.
21func (f *Frame) MarshalArrow() ([]byte, error) {
22	if _, err := f.RowLen(); err != nil {
23		return nil, err
24	}
25
26	arrowFields, err := buildArrowFields(f)
27	if err != nil {
28		return nil, err
29	}
30
31	schema, err := buildArrowSchema(f, arrowFields)
32	if err != nil {
33		return nil, err
34	}
35
36	columns, err := buildArrowColumns(f, arrowFields)
37	if err != nil {
38		return nil, err
39	}
40	defer func(cols []array.Column) {
41		for _, col := range cols {
42			col.Release()
43		}
44	}(columns)
45
46	// Create a table from the schema and columns.
47	table := array.NewTable(schema, columns, -1)
48	defer table.Release()
49
50	tableReader := array.NewTableReader(table, -1)
51	defer tableReader.Release()
52
53	// Arrow tables with the Go API are written to files, so we create a fake
54	// file buffer that the FileWriter can write to. In the future, and with
55	// streaming, I think will likely be using the Arrow message type some how.
56	fb := filebuffer.New(nil)
57
58	fw, err := ipc.NewFileWriter(fb, ipc.WithSchema(tableReader.Schema()))
59	if err != nil {
60		return nil, err
61	}
62
63	for tableReader.Next() {
64		rec := tableReader.Record()
65
66		if err := fw.Write(rec); err != nil {
67			rec.Release()
68			return nil, err
69		}
70		rec.Release()
71	}
72
73	if err := fw.Close(); err != nil {
74		return nil, err
75	}
76
77	return fb.Buff.Bytes(), nil
78}
79
80// buildArrowFields builds Arrow field definitions from a Frame.
81func buildArrowFields(f *Frame) ([]arrow.Field, error) {
82	arrowFields := make([]arrow.Field, len(f.Fields))
83
84	for i, field := range f.Fields {
85		t, nullable, err := fieldToArrow(field)
86		if err != nil {
87			return nil, err
88		}
89
90		fieldMeta := map[string]string{"name": field.Name}
91
92		if field.Labels != nil {
93			if fieldMeta["labels"], err = toJSONString(field.Labels); err != nil {
94				return nil, err
95			}
96		}
97
98		if field.Config != nil {
99			str, err := toJSONString(field.Config)
100			if err != nil {
101				return nil, err
102			}
103			fieldMeta["config"] = str
104		}
105
106		arrowFields[i] = arrow.Field{
107			Name:     field.Name,
108			Type:     t,
109			Metadata: arrow.MetadataFrom(fieldMeta),
110			Nullable: nullable,
111		}
112	}
113
114	return arrowFields, nil
115}
116
117// buildArrowColumns builds Arrow columns from a Frame.
118func buildArrowColumns(f *Frame, arrowFields []arrow.Field) ([]array.Column, error) {
119	pool := memory.NewGoAllocator()
120	columns := make([]array.Column, len(f.Fields))
121
122	for fieldIdx, field := range f.Fields {
123		switch v := field.vector.(type) {
124		case *int8Vector:
125			columns[fieldIdx] = *buildInt8Column(pool, arrowFields[fieldIdx], v)
126		case *nullableInt8Vector:
127			columns[fieldIdx] = *buildNullableInt8Column(pool, arrowFields[fieldIdx], v)
128
129		case *int16Vector:
130			columns[fieldIdx] = *buildInt16Column(pool, arrowFields[fieldIdx], v)
131		case *nullableInt16Vector:
132			columns[fieldIdx] = *buildNullableInt16Column(pool, arrowFields[fieldIdx], v)
133
134		case *int32Vector:
135			columns[fieldIdx] = *buildInt32Column(pool, arrowFields[fieldIdx], v)
136		case *nullableInt32Vector:
137			columns[fieldIdx] = *buildNullableInt32Column(pool, arrowFields[fieldIdx], v)
138
139		case *int64Vector:
140			columns[fieldIdx] = *buildInt64Column(pool, arrowFields[fieldIdx], v)
141		case *nullableInt64Vector:
142			columns[fieldIdx] = *buildNullableInt64Column(pool, arrowFields[fieldIdx], v)
143
144		case *uint8Vector:
145			columns[fieldIdx] = *buildUInt8Column(pool, arrowFields[fieldIdx], v)
146		case *nullableUint8Vector:
147			columns[fieldIdx] = *buildNullableUInt8Column(pool, arrowFields[fieldIdx], v)
148
149		case *uint16Vector:
150			columns[fieldIdx] = *buildUInt16Column(pool, arrowFields[fieldIdx], v)
151		case *nullableUint16Vector:
152			columns[fieldIdx] = *buildNullableUInt16Column(pool, arrowFields[fieldIdx], v)
153
154		case *uint32Vector:
155			columns[fieldIdx] = *buildUInt32Column(pool, arrowFields[fieldIdx], v)
156		case *nullableUint32Vector:
157			columns[fieldIdx] = *buildNullableUInt32Column(pool, arrowFields[fieldIdx], v)
158
159		case *uint64Vector:
160			columns[fieldIdx] = *buildUInt64Column(pool, arrowFields[fieldIdx], v)
161		case *nullableUint64Vector:
162			columns[fieldIdx] = *buildNullableUInt64Column(pool, arrowFields[fieldIdx], v)
163
164		case *stringVector:
165			columns[fieldIdx] = *buildStringColumn(pool, arrowFields[fieldIdx], v)
166		case *nullableStringVector:
167			columns[fieldIdx] = *buildNullableStringColumn(pool, arrowFields[fieldIdx], v)
168
169		case *float32Vector:
170			columns[fieldIdx] = *buildFloat32Column(pool, arrowFields[fieldIdx], v)
171		case *nullableFloat32Vector:
172			columns[fieldIdx] = *buildNullableFloat32Column(pool, arrowFields[fieldIdx], v)
173
174		case *float64Vector:
175			columns[fieldIdx] = *buildFloat64Column(pool, arrowFields[fieldIdx], v)
176		case *nullableFloat64Vector:
177			columns[fieldIdx] = *buildNullableFloat64Column(pool, arrowFields[fieldIdx], v)
178
179		case *boolVector:
180			columns[fieldIdx] = *buildBoolColumn(pool, arrowFields[fieldIdx], v)
181		case *nullableBoolVector:
182			columns[fieldIdx] = *buildNullableBoolColumn(pool, arrowFields[fieldIdx], v)
183
184		case *timeTimeVector:
185			columns[fieldIdx] = *buildTimeColumn(pool, arrowFields[fieldIdx], v)
186		case *nullableTimeTimeVector:
187			columns[fieldIdx] = *buildNullableTimeColumn(pool, arrowFields[fieldIdx], v)
188
189		default:
190			return nil, fmt.Errorf("unsupported field vector type for conversion to arrow: %T", v)
191		}
192	}
193	return columns, nil
194}
195
196// buildArrowSchema builds an Arrow schema for a Frame.
197func buildArrowSchema(f *Frame, fs []arrow.Field) (*arrow.Schema, error) {
198	tableMetaMap := map[string]string{
199		"name":  f.Name,
200		"refId": f.RefID,
201	}
202	if f.Meta != nil {
203		str, err := toJSONString(f.Meta)
204		if err != nil {
205			return nil, err
206		}
207		tableMetaMap["meta"] = str
208	}
209	tableMeta := arrow.MetadataFrom(tableMetaMap)
210
211	return arrow.NewSchema(fs, &tableMeta), nil
212}
213
214// fieldToArrow returns the corresponding Arrow primitive type and nullable property to the fields'
215// Vector primitives.
216func fieldToArrow(f *Field) (arrow.DataType, bool, error) {
217	switch f.vector.(type) {
218	case *stringVector:
219		return &arrow.StringType{}, false, nil
220	case *nullableStringVector:
221		return &arrow.StringType{}, true, nil
222
223	// Ints
224	case *int8Vector:
225		return &arrow.Int8Type{}, false, nil
226	case *nullableInt8Vector:
227		return &arrow.Int8Type{}, true, nil
228
229	case *int16Vector:
230		return &arrow.Int16Type{}, false, nil
231	case *nullableInt16Vector:
232		return &arrow.Int16Type{}, true, nil
233
234	case *int32Vector:
235		return &arrow.Int32Type{}, false, nil
236	case *nullableInt32Vector:
237		return &arrow.Int32Type{}, true, nil
238
239	case *int64Vector:
240		return &arrow.Int64Type{}, false, nil
241	case *nullableInt64Vector:
242		return &arrow.Int64Type{}, true, nil
243
244	// Uints
245	case *uint8Vector:
246		return &arrow.Uint8Type{}, false, nil
247	case *nullableUint8Vector:
248		return &arrow.Uint8Type{}, true, nil
249
250	case *uint16Vector:
251		return &arrow.Uint16Type{}, false, nil
252	case *nullableUint16Vector:
253		return &arrow.Uint16Type{}, true, nil
254
255	case *uint32Vector:
256		return &arrow.Uint32Type{}, false, nil
257	case *nullableUint32Vector:
258		return &arrow.Uint32Type{}, true, nil
259
260	case *uint64Vector:
261		return &arrow.Uint64Type{}, false, nil
262	case *nullableUint64Vector:
263		return &arrow.Uint64Type{}, true, nil
264
265	case *float32Vector:
266		return &arrow.Float32Type{}, false, nil
267	case *nullableFloat32Vector:
268		return &arrow.Float32Type{}, true, nil
269
270	case *float64Vector:
271		return &arrow.Float64Type{}, false, nil
272	case *nullableFloat64Vector:
273		return &arrow.Float64Type{}, true, nil
274
275	case *boolVector:
276		return &arrow.BooleanType{}, false, nil
277	case *nullableBoolVector:
278		return &arrow.BooleanType{}, true, nil
279
280	case *timeTimeVector:
281		return &arrow.TimestampType{}, false, nil
282	case *nullableTimeTimeVector:
283		return &arrow.TimestampType{}, true, nil
284
285	default:
286		return nil, false, fmt.Errorf("unsupported type for conversion to arrow: %T", f.vector)
287	}
288}
289
290func getMDKey(key string, metaData arrow.Metadata) (string, bool) {
291	idx := metaData.FindKey(key)
292	if idx < 0 {
293		return "", false
294	}
295	return metaData.Values()[idx], true
296}
297
298func initializeFrameFields(schema *arrow.Schema, frame *Frame) ([]bool, error) {
299	nullable := make([]bool, len(schema.Fields()))
300	for idx, field := range schema.Fields() {
301		sdkField := &Field{
302			Name: field.Name,
303		}
304		if labelsAsString, ok := getMDKey("labels", field.Metadata); ok {
305			if err := json.Unmarshal([]byte(labelsAsString), &sdkField.Labels); err != nil {
306				return nil, err
307			}
308		}
309		if configAsString, ok := getMDKey("config", field.Metadata); ok {
310			if err := json.Unmarshal([]byte(configAsString), &sdkField.Config); err != nil {
311				return nil, err
312			}
313		}
314		nullable[idx] = field.Nullable
315		if err := initializeFrameField(field, idx, nullable, sdkField); err != nil {
316			return nil, err
317		}
318
319		frame.Fields = append(frame.Fields, sdkField)
320	}
321	return nullable, nil
322}
323
324func initializeFrameField(field arrow.Field, idx int, nullable []bool, sdkField *Field) error {
325	switch field.Type.ID() {
326	case arrow.STRING:
327		if nullable[idx] {
328			sdkField.vector = newNullableStringVector(0)
329			break
330		}
331		sdkField.vector = newStringVector(0)
332	case arrow.INT8:
333		if nullable[idx] {
334			sdkField.vector = newNullableInt8Vector(0)
335			break
336		}
337		sdkField.vector = newInt8Vector(0)
338	case arrow.INT16:
339		if nullable[idx] {
340			sdkField.vector = newNullableInt16Vector(0)
341			break
342		}
343		sdkField.vector = newInt16Vector(0)
344	case arrow.INT32:
345		if nullable[idx] {
346			sdkField.vector = newNullableInt32Vector(0)
347			break
348		}
349		sdkField.vector = newInt32Vector(0)
350	case arrow.INT64:
351		if nullable[idx] {
352			sdkField.vector = newNullableInt64Vector(0)
353			break
354		}
355		sdkField.vector = newInt64Vector(0)
356	case arrow.UINT8:
357		if nullable[idx] {
358			sdkField.vector = newNullableUint8Vector(0)
359			break
360		}
361		sdkField.vector = newUint8Vector(0)
362	case arrow.UINT16:
363		if nullable[idx] {
364			sdkField.vector = newNullableUint16Vector(0)
365			break
366		}
367		sdkField.vector = newUint16Vector(0)
368	case arrow.UINT32:
369		if nullable[idx] {
370			sdkField.vector = newNullableUint32Vector(0)
371			break
372		}
373		sdkField.vector = newUint32Vector(0)
374	case arrow.UINT64:
375		if nullable[idx] {
376			sdkField.vector = newNullableUint64Vector(0)
377			break
378		}
379		sdkField.vector = newUint64Vector(0)
380	case arrow.FLOAT32:
381		if nullable[idx] {
382			sdkField.vector = newNullableFloat32Vector(0)
383			break
384		}
385		sdkField.vector = newFloat32Vector(0)
386	case arrow.FLOAT64:
387		if nullable[idx] {
388			sdkField.vector = newNullableFloat64Vector(0)
389			break
390		}
391		sdkField.vector = newFloat64Vector(0)
392	case arrow.BOOL:
393		if nullable[idx] {
394			sdkField.vector = newNullableBoolVector(0)
395			break
396		}
397		sdkField.vector = newBoolVector(0)
398	case arrow.TIMESTAMP:
399		if nullable[idx] {
400			sdkField.vector = newNullableTimeTimeVector(0)
401			break
402		}
403		sdkField.vector = newTimeTimeVector(0)
404	default:
405		return fmt.Errorf("unsupported conversion from arrow to sdk type for arrow type %v", field.Type.ID().String())
406	}
407
408	return nil
409}
410
411func populateFrameFieldsFromRecord(record array.Record, nullable []bool, frame *Frame) error {
412	for i := 0; i < len(frame.Fields); i++ {
413		col := record.Column(i)
414		if err := parseColumn(col, i, nullable, frame); err != nil {
415			return err
416		}
417	}
418	return nil
419}
420
421func populateFrameFields(fR arrio.Reader, nullable []bool, frame *Frame) error {
422	for {
423		record, err := fR.Read()
424		if errors.Is(err, io.EOF) {
425			break
426		}
427		if err != nil {
428			return err
429		}
430
431		if err = populateFrameFieldsFromRecord(record, nullable, frame); err != nil {
432			return err
433		}
434	}
435	return nil
436}
437
438// nolint:gocyclo
439func parseColumn(col array.Interface, i int, nullable []bool, frame *Frame) error {
440	switch col.DataType().ID() {
441	case arrow.STRING:
442		v := array.NewStringData(col.Data())
443		for rIdx := 0; rIdx < col.Len(); rIdx++ {
444			if nullable[i] {
445				if v.IsNull(rIdx) {
446					var ns *string
447					frame.Fields[i].vector.Append(ns)
448					continue
449				}
450				rv := v.Value(rIdx)
451				frame.Fields[i].vector.Append(&rv)
452				continue
453			}
454			frame.Fields[i].vector.Append(v.Value(rIdx))
455		}
456	case arrow.INT8:
457		v := array.NewInt8Data(col.Data())
458		for rIdx := 0; rIdx < col.Len(); rIdx++ {
459			if nullable[i] {
460				if v.IsNull(rIdx) {
461					var ns *int8
462					frame.Fields[i].vector.Append(ns)
463					continue
464				}
465				rv := v.Value(rIdx)
466				frame.Fields[i].vector.Append(&rv)
467				continue
468			}
469			frame.Fields[i].vector.Append(v.Value(rIdx))
470		}
471	case arrow.INT16:
472		v := array.NewInt16Data(col.Data())
473		for rIdx := 0; rIdx < col.Len(); rIdx++ {
474			if nullable[i] {
475				if v.IsNull(rIdx) {
476					var ns *int16
477					frame.Fields[i].vector.Append(ns)
478					continue
479				}
480				rv := v.Value(rIdx)
481				frame.Fields[i].vector.Append(&rv)
482				continue
483			}
484			frame.Fields[i].vector.Append(v.Value(rIdx))
485		}
486	case arrow.INT32:
487		v := array.NewInt32Data(col.Data())
488		for rIdx := 0; rIdx < col.Len(); rIdx++ {
489			if nullable[i] {
490				if v.IsNull(rIdx) {
491					var ns *int32
492					frame.Fields[i].vector.Append(ns)
493					continue
494				}
495				rv := v.Value(rIdx)
496				frame.Fields[i].vector.Append(&rv)
497				continue
498			}
499			frame.Fields[i].vector.Append(v.Value(rIdx))
500		}
501	case arrow.INT64:
502		v := array.NewInt64Data(col.Data())
503		for rIdx := 0; rIdx < col.Len(); rIdx++ {
504			if nullable[i] {
505				if v.IsNull(rIdx) {
506					var ns *int64
507					frame.Fields[i].vector.Append(ns)
508					continue
509				}
510				rv := v.Value(rIdx)
511				frame.Fields[i].vector.Append(&rv)
512				continue
513			}
514			frame.Fields[i].vector.Append(v.Value(rIdx))
515		}
516	case arrow.UINT8:
517		v := array.NewUint8Data(col.Data())
518		for rIdx := 0; rIdx < col.Len(); rIdx++ {
519			if nullable[i] {
520				if v.IsNull(rIdx) {
521					var ns *uint8
522					frame.Fields[i].vector.Append(ns)
523					continue
524				}
525				rv := v.Value(rIdx)
526				frame.Fields[i].vector.Append(&rv)
527				continue
528			}
529			frame.Fields[i].vector.Append(v.Value(rIdx))
530		}
531	case arrow.UINT32:
532		v := array.NewUint32Data(col.Data())
533		for rIdx := 0; rIdx < col.Len(); rIdx++ {
534			if nullable[i] {
535				if v.IsNull(rIdx) {
536					var ns *uint32
537					frame.Fields[i].vector.Append(ns)
538					continue
539				}
540				rv := v.Value(rIdx)
541				frame.Fields[i].vector.Append(&rv)
542				continue
543			}
544			frame.Fields[i].vector.Append(v.Value(rIdx))
545		}
546	case arrow.UINT64:
547		v := array.NewUint64Data(col.Data())
548		for rIdx := 0; rIdx < col.Len(); rIdx++ {
549			if nullable[i] {
550				if v.IsNull(rIdx) {
551					var ns *uint64
552					frame.Fields[i].vector.Append(ns)
553					continue
554				}
555				rv := v.Value(rIdx)
556				frame.Fields[i].vector.Append(&rv)
557				continue
558			}
559			frame.Fields[i].vector.Append(v.Value(rIdx))
560		}
561	case arrow.UINT16:
562		v := array.NewUint16Data(col.Data())
563		for rIdx := 0; rIdx < col.Len(); rIdx++ {
564			if nullable[i] {
565				if v.IsNull(rIdx) {
566					var ns *uint16
567					frame.Fields[i].vector.Append(ns)
568					continue
569				}
570				rv := v.Value(rIdx)
571				frame.Fields[i].vector.Append(&rv)
572				continue
573			}
574			frame.Fields[i].vector.Append(v.Value(rIdx))
575		}
576	case arrow.FLOAT32:
577		v := array.NewFloat32Data(col.Data())
578		for vIdx, f := range v.Float32Values() {
579			if nullable[i] {
580				if v.IsNull(vIdx) {
581					var nf *float32
582					frame.Fields[i].vector.Append(nf)
583					continue
584				}
585				vF := f
586				frame.Fields[i].vector.Append(&vF)
587				continue
588			}
589			frame.Fields[i].vector.Append(f)
590		}
591	case arrow.FLOAT64:
592		v := array.NewFloat64Data(col.Data())
593		for vIdx, f := range v.Float64Values() {
594			if nullable[i] {
595				if v.IsNull(vIdx) {
596					var nf *float64
597					frame.Fields[i].vector.Append(nf)
598					continue
599				}
600				vF := f
601				frame.Fields[i].vector.Append(&vF)
602				continue
603			}
604			frame.Fields[i].vector.Append(f)
605		}
606	case arrow.BOOL:
607		v := array.NewBooleanData(col.Data())
608		for sIdx := 0; sIdx < col.Len(); sIdx++ {
609			if nullable[i] {
610				if v.IsNull(sIdx) {
611					var ns *bool
612					frame.Fields[i].vector.Append(ns)
613					continue
614				}
615				vB := v.Value(sIdx)
616				frame.Fields[i].vector.Append(&vB)
617				continue
618			}
619			frame.Fields[i].vector.Append(v.Value(sIdx))
620		}
621	case arrow.TIMESTAMP:
622		v := array.NewTimestampData(col.Data())
623		for vIdx, ts := range v.TimestampValues() {
624			t := time.Unix(0, int64(ts)) // nanosecond assumption
625			if nullable[i] {
626				if v.IsNull(vIdx) {
627					var nt *time.Time
628					frame.Fields[i].vector.Append(nt)
629					continue
630				}
631				frame.Fields[i].vector.Append(&t)
632				continue
633			}
634			frame.Fields[i].vector.Append(t)
635		}
636	default:
637		return fmt.Errorf("unsupported arrow type %s for conversion", col.DataType().ID())
638	}
639
640	return nil
641}
642
643func populateFrameFromSchema(schema *arrow.Schema, frame *Frame) error {
644	metaData := schema.Metadata()
645	frame.Name, _ = getMDKey("name", metaData) // No need to check ok, zero value ("") is returned
646	frame.RefID, _ = getMDKey("refId", metaData)
647
648	var err error
649	if metaAsString, ok := getMDKey("meta", metaData); ok {
650		frame.Meta, err = FrameMetaFromJSON(metaAsString)
651	}
652
653	return err
654}
655
656// FromArrowRecord converts a an Arrow record batch into a Frame.
657func FromArrowRecord(record array.Record) (*Frame, error) {
658	schema := record.Schema()
659	frame := &Frame{}
660	if err := populateFrameFromSchema(schema, frame); err != nil {
661		return nil, err
662	}
663
664	nullable, err := initializeFrameFields(schema, frame)
665	if err != nil {
666		return nil, err
667	}
668
669	if err = populateFrameFieldsFromRecord(record, nullable, frame); err != nil {
670		return nil, err
671	}
672	return frame, nil
673}
674
675// UnmarshalArrowFrame converts a byte representation of an arrow table to a Frame.
676func UnmarshalArrowFrame(b []byte) (*Frame, error) {
677	fB := filebuffer.New(b)
678	fR, err := ipc.NewFileReader(fB)
679	if err != nil {
680		return nil, err
681	}
682	defer fR.Close()
683
684	schema := fR.Schema()
685	frame := &Frame{}
686	if err := populateFrameFromSchema(schema, frame); err != nil {
687		return nil, err
688	}
689
690	nullable, err := initializeFrameFields(schema, frame)
691	if err != nil {
692		return nil, err
693	}
694
695	if err = populateFrameFields(fR, nullable, frame); err != nil {
696		return nil, err
697	}
698
699	return frame, nil
700}
701
702// ToJSONString calls json.Marshal on val and returns it as a string. An
703// error is returned if json.Marshal errors.
704func toJSONString(val interface{}) (string, error) {
705	b, err := json.Marshal(val)
706	if err != nil {
707		return "", err
708	}
709	return string(b), nil
710}
711
712// UnmarshalArrowFrames decodes a slice of Arrow encoded frames to Frames ([]*Frame) by calling
713// the UnmarshalArrow function on each encoded frame.
714// If an error occurs Frames will be nil.
715// See Frames.UnMarshalArrow() for the inverse operation.
716func UnmarshalArrowFrames(bFrames [][]byte) (Frames, error) {
717	frames := make(Frames, len(bFrames))
718	var err error
719	for i, encodedFrame := range bFrames {
720		frames[i], err = UnmarshalArrowFrame(encodedFrame)
721		if err != nil {
722			return nil, err
723		}
724	}
725	return frames, nil
726}
727
728// MarshalArrow encodes Frames into a slice of []byte using *Frame's MarshalArrow method on each Frame.
729// If an error occurs [][]byte will be nil.
730// See UnmarshalArrowFrames for the inverse operation.
731func (frames Frames) MarshalArrow() ([][]byte, error) {
732	bs := make([][]byte, len(frames))
733	var err error
734	for i, frame := range frames {
735		bs[i], err = frame.MarshalArrow()
736		if err != nil {
737			return nil, err
738		}
739	}
740	return bs, nil
741}
742