1package data 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "io" 8 "time" 9 10 "github.com/apache/arrow/go/arrow" 11 "github.com/apache/arrow/go/arrow/array" 12 "github.com/apache/arrow/go/arrow/arrio" 13 "github.com/apache/arrow/go/arrow/ipc" 14 "github.com/apache/arrow/go/arrow/memory" 15 "github.com/mattetti/filebuffer" 16) 17 18// MarshalArrow converts the Frame to an arrow table and returns a byte 19// representation of that table. 20// All fields of a Frame must be of the same length or an error is returned. 21func (f *Frame) MarshalArrow() ([]byte, error) { 22 if _, err := f.RowLen(); err != nil { 23 return nil, err 24 } 25 26 arrowFields, err := buildArrowFields(f) 27 if err != nil { 28 return nil, err 29 } 30 31 schema, err := buildArrowSchema(f, arrowFields) 32 if err != nil { 33 return nil, err 34 } 35 36 columns, err := buildArrowColumns(f, arrowFields) 37 if err != nil { 38 return nil, err 39 } 40 defer func(cols []array.Column) { 41 for _, col := range cols { 42 col.Release() 43 } 44 }(columns) 45 46 // Create a table from the schema and columns. 47 table := array.NewTable(schema, columns, -1) 48 defer table.Release() 49 50 tableReader := array.NewTableReader(table, -1) 51 defer tableReader.Release() 52 53 // Arrow tables with the Go API are written to files, so we create a fake 54 // file buffer that the FileWriter can write to. In the future, and with 55 // streaming, I think will likely be using the Arrow message type some how. 56 fb := filebuffer.New(nil) 57 58 fw, err := ipc.NewFileWriter(fb, ipc.WithSchema(tableReader.Schema())) 59 if err != nil { 60 return nil, err 61 } 62 63 for tableReader.Next() { 64 rec := tableReader.Record() 65 66 if err := fw.Write(rec); err != nil { 67 rec.Release() 68 return nil, err 69 } 70 rec.Release() 71 } 72 73 if err := fw.Close(); err != nil { 74 return nil, err 75 } 76 77 return fb.Buff.Bytes(), nil 78} 79 80// buildArrowFields builds Arrow field definitions from a Frame. 81func buildArrowFields(f *Frame) ([]arrow.Field, error) { 82 arrowFields := make([]arrow.Field, len(f.Fields)) 83 84 for i, field := range f.Fields { 85 t, nullable, err := fieldToArrow(field) 86 if err != nil { 87 return nil, err 88 } 89 90 fieldMeta := map[string]string{"name": field.Name} 91 92 if field.Labels != nil { 93 if fieldMeta["labels"], err = toJSONString(field.Labels); err != nil { 94 return nil, err 95 } 96 } 97 98 if field.Config != nil { 99 str, err := toJSONString(field.Config) 100 if err != nil { 101 return nil, err 102 } 103 fieldMeta["config"] = str 104 } 105 106 arrowFields[i] = arrow.Field{ 107 Name: field.Name, 108 Type: t, 109 Metadata: arrow.MetadataFrom(fieldMeta), 110 Nullable: nullable, 111 } 112 } 113 114 return arrowFields, nil 115} 116 117// buildArrowColumns builds Arrow columns from a Frame. 118func buildArrowColumns(f *Frame, arrowFields []arrow.Field) ([]array.Column, error) { 119 pool := memory.NewGoAllocator() 120 columns := make([]array.Column, len(f.Fields)) 121 122 for fieldIdx, field := range f.Fields { 123 switch v := field.vector.(type) { 124 case *int8Vector: 125 columns[fieldIdx] = *buildInt8Column(pool, arrowFields[fieldIdx], v) 126 case *nullableInt8Vector: 127 columns[fieldIdx] = *buildNullableInt8Column(pool, arrowFields[fieldIdx], v) 128 129 case *int16Vector: 130 columns[fieldIdx] = *buildInt16Column(pool, arrowFields[fieldIdx], v) 131 case *nullableInt16Vector: 132 columns[fieldIdx] = *buildNullableInt16Column(pool, arrowFields[fieldIdx], v) 133 134 case *int32Vector: 135 columns[fieldIdx] = *buildInt32Column(pool, arrowFields[fieldIdx], v) 136 case *nullableInt32Vector: 137 columns[fieldIdx] = *buildNullableInt32Column(pool, arrowFields[fieldIdx], v) 138 139 case *int64Vector: 140 columns[fieldIdx] = *buildInt64Column(pool, arrowFields[fieldIdx], v) 141 case *nullableInt64Vector: 142 columns[fieldIdx] = *buildNullableInt64Column(pool, arrowFields[fieldIdx], v) 143 144 case *uint8Vector: 145 columns[fieldIdx] = *buildUInt8Column(pool, arrowFields[fieldIdx], v) 146 case *nullableUint8Vector: 147 columns[fieldIdx] = *buildNullableUInt8Column(pool, arrowFields[fieldIdx], v) 148 149 case *uint16Vector: 150 columns[fieldIdx] = *buildUInt16Column(pool, arrowFields[fieldIdx], v) 151 case *nullableUint16Vector: 152 columns[fieldIdx] = *buildNullableUInt16Column(pool, arrowFields[fieldIdx], v) 153 154 case *uint32Vector: 155 columns[fieldIdx] = *buildUInt32Column(pool, arrowFields[fieldIdx], v) 156 case *nullableUint32Vector: 157 columns[fieldIdx] = *buildNullableUInt32Column(pool, arrowFields[fieldIdx], v) 158 159 case *uint64Vector: 160 columns[fieldIdx] = *buildUInt64Column(pool, arrowFields[fieldIdx], v) 161 case *nullableUint64Vector: 162 columns[fieldIdx] = *buildNullableUInt64Column(pool, arrowFields[fieldIdx], v) 163 164 case *stringVector: 165 columns[fieldIdx] = *buildStringColumn(pool, arrowFields[fieldIdx], v) 166 case *nullableStringVector: 167 columns[fieldIdx] = *buildNullableStringColumn(pool, arrowFields[fieldIdx], v) 168 169 case *float32Vector: 170 columns[fieldIdx] = *buildFloat32Column(pool, arrowFields[fieldIdx], v) 171 case *nullableFloat32Vector: 172 columns[fieldIdx] = *buildNullableFloat32Column(pool, arrowFields[fieldIdx], v) 173 174 case *float64Vector: 175 columns[fieldIdx] = *buildFloat64Column(pool, arrowFields[fieldIdx], v) 176 case *nullableFloat64Vector: 177 columns[fieldIdx] = *buildNullableFloat64Column(pool, arrowFields[fieldIdx], v) 178 179 case *boolVector: 180 columns[fieldIdx] = *buildBoolColumn(pool, arrowFields[fieldIdx], v) 181 case *nullableBoolVector: 182 columns[fieldIdx] = *buildNullableBoolColumn(pool, arrowFields[fieldIdx], v) 183 184 case *timeTimeVector: 185 columns[fieldIdx] = *buildTimeColumn(pool, arrowFields[fieldIdx], v) 186 case *nullableTimeTimeVector: 187 columns[fieldIdx] = *buildNullableTimeColumn(pool, arrowFields[fieldIdx], v) 188 189 default: 190 return nil, fmt.Errorf("unsupported field vector type for conversion to arrow: %T", v) 191 } 192 } 193 return columns, nil 194} 195 196// buildArrowSchema builds an Arrow schema for a Frame. 197func buildArrowSchema(f *Frame, fs []arrow.Field) (*arrow.Schema, error) { 198 tableMetaMap := map[string]string{ 199 "name": f.Name, 200 "refId": f.RefID, 201 } 202 if f.Meta != nil { 203 str, err := toJSONString(f.Meta) 204 if err != nil { 205 return nil, err 206 } 207 tableMetaMap["meta"] = str 208 } 209 tableMeta := arrow.MetadataFrom(tableMetaMap) 210 211 return arrow.NewSchema(fs, &tableMeta), nil 212} 213 214// fieldToArrow returns the corresponding Arrow primitive type and nullable property to the fields' 215// Vector primitives. 216func fieldToArrow(f *Field) (arrow.DataType, bool, error) { 217 switch f.vector.(type) { 218 case *stringVector: 219 return &arrow.StringType{}, false, nil 220 case *nullableStringVector: 221 return &arrow.StringType{}, true, nil 222 223 // Ints 224 case *int8Vector: 225 return &arrow.Int8Type{}, false, nil 226 case *nullableInt8Vector: 227 return &arrow.Int8Type{}, true, nil 228 229 case *int16Vector: 230 return &arrow.Int16Type{}, false, nil 231 case *nullableInt16Vector: 232 return &arrow.Int16Type{}, true, nil 233 234 case *int32Vector: 235 return &arrow.Int32Type{}, false, nil 236 case *nullableInt32Vector: 237 return &arrow.Int32Type{}, true, nil 238 239 case *int64Vector: 240 return &arrow.Int64Type{}, false, nil 241 case *nullableInt64Vector: 242 return &arrow.Int64Type{}, true, nil 243 244 // Uints 245 case *uint8Vector: 246 return &arrow.Uint8Type{}, false, nil 247 case *nullableUint8Vector: 248 return &arrow.Uint8Type{}, true, nil 249 250 case *uint16Vector: 251 return &arrow.Uint16Type{}, false, nil 252 case *nullableUint16Vector: 253 return &arrow.Uint16Type{}, true, nil 254 255 case *uint32Vector: 256 return &arrow.Uint32Type{}, false, nil 257 case *nullableUint32Vector: 258 return &arrow.Uint32Type{}, true, nil 259 260 case *uint64Vector: 261 return &arrow.Uint64Type{}, false, nil 262 case *nullableUint64Vector: 263 return &arrow.Uint64Type{}, true, nil 264 265 case *float32Vector: 266 return &arrow.Float32Type{}, false, nil 267 case *nullableFloat32Vector: 268 return &arrow.Float32Type{}, true, nil 269 270 case *float64Vector: 271 return &arrow.Float64Type{}, false, nil 272 case *nullableFloat64Vector: 273 return &arrow.Float64Type{}, true, nil 274 275 case *boolVector: 276 return &arrow.BooleanType{}, false, nil 277 case *nullableBoolVector: 278 return &arrow.BooleanType{}, true, nil 279 280 case *timeTimeVector: 281 return &arrow.TimestampType{}, false, nil 282 case *nullableTimeTimeVector: 283 return &arrow.TimestampType{}, true, nil 284 285 default: 286 return nil, false, fmt.Errorf("unsupported type for conversion to arrow: %T", f.vector) 287 } 288} 289 290func getMDKey(key string, metaData arrow.Metadata) (string, bool) { 291 idx := metaData.FindKey(key) 292 if idx < 0 { 293 return "", false 294 } 295 return metaData.Values()[idx], true 296} 297 298func initializeFrameFields(schema *arrow.Schema, frame *Frame) ([]bool, error) { 299 nullable := make([]bool, len(schema.Fields())) 300 for idx, field := range schema.Fields() { 301 sdkField := &Field{ 302 Name: field.Name, 303 } 304 if labelsAsString, ok := getMDKey("labels", field.Metadata); ok { 305 if err := json.Unmarshal([]byte(labelsAsString), &sdkField.Labels); err != nil { 306 return nil, err 307 } 308 } 309 if configAsString, ok := getMDKey("config", field.Metadata); ok { 310 if err := json.Unmarshal([]byte(configAsString), &sdkField.Config); err != nil { 311 return nil, err 312 } 313 } 314 nullable[idx] = field.Nullable 315 if err := initializeFrameField(field, idx, nullable, sdkField); err != nil { 316 return nil, err 317 } 318 319 frame.Fields = append(frame.Fields, sdkField) 320 } 321 return nullable, nil 322} 323 324func initializeFrameField(field arrow.Field, idx int, nullable []bool, sdkField *Field) error { 325 switch field.Type.ID() { 326 case arrow.STRING: 327 if nullable[idx] { 328 sdkField.vector = newNullableStringVector(0) 329 break 330 } 331 sdkField.vector = newStringVector(0) 332 case arrow.INT8: 333 if nullable[idx] { 334 sdkField.vector = newNullableInt8Vector(0) 335 break 336 } 337 sdkField.vector = newInt8Vector(0) 338 case arrow.INT16: 339 if nullable[idx] { 340 sdkField.vector = newNullableInt16Vector(0) 341 break 342 } 343 sdkField.vector = newInt16Vector(0) 344 case arrow.INT32: 345 if nullable[idx] { 346 sdkField.vector = newNullableInt32Vector(0) 347 break 348 } 349 sdkField.vector = newInt32Vector(0) 350 case arrow.INT64: 351 if nullable[idx] { 352 sdkField.vector = newNullableInt64Vector(0) 353 break 354 } 355 sdkField.vector = newInt64Vector(0) 356 case arrow.UINT8: 357 if nullable[idx] { 358 sdkField.vector = newNullableUint8Vector(0) 359 break 360 } 361 sdkField.vector = newUint8Vector(0) 362 case arrow.UINT16: 363 if nullable[idx] { 364 sdkField.vector = newNullableUint16Vector(0) 365 break 366 } 367 sdkField.vector = newUint16Vector(0) 368 case arrow.UINT32: 369 if nullable[idx] { 370 sdkField.vector = newNullableUint32Vector(0) 371 break 372 } 373 sdkField.vector = newUint32Vector(0) 374 case arrow.UINT64: 375 if nullable[idx] { 376 sdkField.vector = newNullableUint64Vector(0) 377 break 378 } 379 sdkField.vector = newUint64Vector(0) 380 case arrow.FLOAT32: 381 if nullable[idx] { 382 sdkField.vector = newNullableFloat32Vector(0) 383 break 384 } 385 sdkField.vector = newFloat32Vector(0) 386 case arrow.FLOAT64: 387 if nullable[idx] { 388 sdkField.vector = newNullableFloat64Vector(0) 389 break 390 } 391 sdkField.vector = newFloat64Vector(0) 392 case arrow.BOOL: 393 if nullable[idx] { 394 sdkField.vector = newNullableBoolVector(0) 395 break 396 } 397 sdkField.vector = newBoolVector(0) 398 case arrow.TIMESTAMP: 399 if nullable[idx] { 400 sdkField.vector = newNullableTimeTimeVector(0) 401 break 402 } 403 sdkField.vector = newTimeTimeVector(0) 404 default: 405 return fmt.Errorf("unsupported conversion from arrow to sdk type for arrow type %v", field.Type.ID().String()) 406 } 407 408 return nil 409} 410 411func populateFrameFieldsFromRecord(record array.Record, nullable []bool, frame *Frame) error { 412 for i := 0; i < len(frame.Fields); i++ { 413 col := record.Column(i) 414 if err := parseColumn(col, i, nullable, frame); err != nil { 415 return err 416 } 417 } 418 return nil 419} 420 421func populateFrameFields(fR arrio.Reader, nullable []bool, frame *Frame) error { 422 for { 423 record, err := fR.Read() 424 if errors.Is(err, io.EOF) { 425 break 426 } 427 if err != nil { 428 return err 429 } 430 431 if err = populateFrameFieldsFromRecord(record, nullable, frame); err != nil { 432 return err 433 } 434 } 435 return nil 436} 437 438// nolint:gocyclo 439func parseColumn(col array.Interface, i int, nullable []bool, frame *Frame) error { 440 switch col.DataType().ID() { 441 case arrow.STRING: 442 v := array.NewStringData(col.Data()) 443 for rIdx := 0; rIdx < col.Len(); rIdx++ { 444 if nullable[i] { 445 if v.IsNull(rIdx) { 446 var ns *string 447 frame.Fields[i].vector.Append(ns) 448 continue 449 } 450 rv := v.Value(rIdx) 451 frame.Fields[i].vector.Append(&rv) 452 continue 453 } 454 frame.Fields[i].vector.Append(v.Value(rIdx)) 455 } 456 case arrow.INT8: 457 v := array.NewInt8Data(col.Data()) 458 for rIdx := 0; rIdx < col.Len(); rIdx++ { 459 if nullable[i] { 460 if v.IsNull(rIdx) { 461 var ns *int8 462 frame.Fields[i].vector.Append(ns) 463 continue 464 } 465 rv := v.Value(rIdx) 466 frame.Fields[i].vector.Append(&rv) 467 continue 468 } 469 frame.Fields[i].vector.Append(v.Value(rIdx)) 470 } 471 case arrow.INT16: 472 v := array.NewInt16Data(col.Data()) 473 for rIdx := 0; rIdx < col.Len(); rIdx++ { 474 if nullable[i] { 475 if v.IsNull(rIdx) { 476 var ns *int16 477 frame.Fields[i].vector.Append(ns) 478 continue 479 } 480 rv := v.Value(rIdx) 481 frame.Fields[i].vector.Append(&rv) 482 continue 483 } 484 frame.Fields[i].vector.Append(v.Value(rIdx)) 485 } 486 case arrow.INT32: 487 v := array.NewInt32Data(col.Data()) 488 for rIdx := 0; rIdx < col.Len(); rIdx++ { 489 if nullable[i] { 490 if v.IsNull(rIdx) { 491 var ns *int32 492 frame.Fields[i].vector.Append(ns) 493 continue 494 } 495 rv := v.Value(rIdx) 496 frame.Fields[i].vector.Append(&rv) 497 continue 498 } 499 frame.Fields[i].vector.Append(v.Value(rIdx)) 500 } 501 case arrow.INT64: 502 v := array.NewInt64Data(col.Data()) 503 for rIdx := 0; rIdx < col.Len(); rIdx++ { 504 if nullable[i] { 505 if v.IsNull(rIdx) { 506 var ns *int64 507 frame.Fields[i].vector.Append(ns) 508 continue 509 } 510 rv := v.Value(rIdx) 511 frame.Fields[i].vector.Append(&rv) 512 continue 513 } 514 frame.Fields[i].vector.Append(v.Value(rIdx)) 515 } 516 case arrow.UINT8: 517 v := array.NewUint8Data(col.Data()) 518 for rIdx := 0; rIdx < col.Len(); rIdx++ { 519 if nullable[i] { 520 if v.IsNull(rIdx) { 521 var ns *uint8 522 frame.Fields[i].vector.Append(ns) 523 continue 524 } 525 rv := v.Value(rIdx) 526 frame.Fields[i].vector.Append(&rv) 527 continue 528 } 529 frame.Fields[i].vector.Append(v.Value(rIdx)) 530 } 531 case arrow.UINT32: 532 v := array.NewUint32Data(col.Data()) 533 for rIdx := 0; rIdx < col.Len(); rIdx++ { 534 if nullable[i] { 535 if v.IsNull(rIdx) { 536 var ns *uint32 537 frame.Fields[i].vector.Append(ns) 538 continue 539 } 540 rv := v.Value(rIdx) 541 frame.Fields[i].vector.Append(&rv) 542 continue 543 } 544 frame.Fields[i].vector.Append(v.Value(rIdx)) 545 } 546 case arrow.UINT64: 547 v := array.NewUint64Data(col.Data()) 548 for rIdx := 0; rIdx < col.Len(); rIdx++ { 549 if nullable[i] { 550 if v.IsNull(rIdx) { 551 var ns *uint64 552 frame.Fields[i].vector.Append(ns) 553 continue 554 } 555 rv := v.Value(rIdx) 556 frame.Fields[i].vector.Append(&rv) 557 continue 558 } 559 frame.Fields[i].vector.Append(v.Value(rIdx)) 560 } 561 case arrow.UINT16: 562 v := array.NewUint16Data(col.Data()) 563 for rIdx := 0; rIdx < col.Len(); rIdx++ { 564 if nullable[i] { 565 if v.IsNull(rIdx) { 566 var ns *uint16 567 frame.Fields[i].vector.Append(ns) 568 continue 569 } 570 rv := v.Value(rIdx) 571 frame.Fields[i].vector.Append(&rv) 572 continue 573 } 574 frame.Fields[i].vector.Append(v.Value(rIdx)) 575 } 576 case arrow.FLOAT32: 577 v := array.NewFloat32Data(col.Data()) 578 for vIdx, f := range v.Float32Values() { 579 if nullable[i] { 580 if v.IsNull(vIdx) { 581 var nf *float32 582 frame.Fields[i].vector.Append(nf) 583 continue 584 } 585 vF := f 586 frame.Fields[i].vector.Append(&vF) 587 continue 588 } 589 frame.Fields[i].vector.Append(f) 590 } 591 case arrow.FLOAT64: 592 v := array.NewFloat64Data(col.Data()) 593 for vIdx, f := range v.Float64Values() { 594 if nullable[i] { 595 if v.IsNull(vIdx) { 596 var nf *float64 597 frame.Fields[i].vector.Append(nf) 598 continue 599 } 600 vF := f 601 frame.Fields[i].vector.Append(&vF) 602 continue 603 } 604 frame.Fields[i].vector.Append(f) 605 } 606 case arrow.BOOL: 607 v := array.NewBooleanData(col.Data()) 608 for sIdx := 0; sIdx < col.Len(); sIdx++ { 609 if nullable[i] { 610 if v.IsNull(sIdx) { 611 var ns *bool 612 frame.Fields[i].vector.Append(ns) 613 continue 614 } 615 vB := v.Value(sIdx) 616 frame.Fields[i].vector.Append(&vB) 617 continue 618 } 619 frame.Fields[i].vector.Append(v.Value(sIdx)) 620 } 621 case arrow.TIMESTAMP: 622 v := array.NewTimestampData(col.Data()) 623 for vIdx, ts := range v.TimestampValues() { 624 t := time.Unix(0, int64(ts)) // nanosecond assumption 625 if nullable[i] { 626 if v.IsNull(vIdx) { 627 var nt *time.Time 628 frame.Fields[i].vector.Append(nt) 629 continue 630 } 631 frame.Fields[i].vector.Append(&t) 632 continue 633 } 634 frame.Fields[i].vector.Append(t) 635 } 636 default: 637 return fmt.Errorf("unsupported arrow type %s for conversion", col.DataType().ID()) 638 } 639 640 return nil 641} 642 643func populateFrameFromSchema(schema *arrow.Schema, frame *Frame) error { 644 metaData := schema.Metadata() 645 frame.Name, _ = getMDKey("name", metaData) // No need to check ok, zero value ("") is returned 646 frame.RefID, _ = getMDKey("refId", metaData) 647 648 var err error 649 if metaAsString, ok := getMDKey("meta", metaData); ok { 650 frame.Meta, err = FrameMetaFromJSON(metaAsString) 651 } 652 653 return err 654} 655 656// FromArrowRecord converts a an Arrow record batch into a Frame. 657func FromArrowRecord(record array.Record) (*Frame, error) { 658 schema := record.Schema() 659 frame := &Frame{} 660 if err := populateFrameFromSchema(schema, frame); err != nil { 661 return nil, err 662 } 663 664 nullable, err := initializeFrameFields(schema, frame) 665 if err != nil { 666 return nil, err 667 } 668 669 if err = populateFrameFieldsFromRecord(record, nullable, frame); err != nil { 670 return nil, err 671 } 672 return frame, nil 673} 674 675// UnmarshalArrowFrame converts a byte representation of an arrow table to a Frame. 676func UnmarshalArrowFrame(b []byte) (*Frame, error) { 677 fB := filebuffer.New(b) 678 fR, err := ipc.NewFileReader(fB) 679 if err != nil { 680 return nil, err 681 } 682 defer fR.Close() 683 684 schema := fR.Schema() 685 frame := &Frame{} 686 if err := populateFrameFromSchema(schema, frame); err != nil { 687 return nil, err 688 } 689 690 nullable, err := initializeFrameFields(schema, frame) 691 if err != nil { 692 return nil, err 693 } 694 695 if err = populateFrameFields(fR, nullable, frame); err != nil { 696 return nil, err 697 } 698 699 return frame, nil 700} 701 702// ToJSONString calls json.Marshal on val and returns it as a string. An 703// error is returned if json.Marshal errors. 704func toJSONString(val interface{}) (string, error) { 705 b, err := json.Marshal(val) 706 if err != nil { 707 return "", err 708 } 709 return string(b), nil 710} 711 712// UnmarshalArrowFrames decodes a slice of Arrow encoded frames to Frames ([]*Frame) by calling 713// the UnmarshalArrow function on each encoded frame. 714// If an error occurs Frames will be nil. 715// See Frames.UnMarshalArrow() for the inverse operation. 716func UnmarshalArrowFrames(bFrames [][]byte) (Frames, error) { 717 frames := make(Frames, len(bFrames)) 718 var err error 719 for i, encodedFrame := range bFrames { 720 frames[i], err = UnmarshalArrowFrame(encodedFrame) 721 if err != nil { 722 return nil, err 723 } 724 } 725 return frames, nil 726} 727 728// MarshalArrow encodes Frames into a slice of []byte using *Frame's MarshalArrow method on each Frame. 729// If an error occurs [][]byte will be nil. 730// See UnmarshalArrowFrames for the inverse operation. 731func (frames Frames) MarshalArrow() ([][]byte, error) { 732 bs := make([][]byte, len(frames)) 733 var err error 734 for i, frame := range frames { 735 bs[i], err = frame.MarshalArrow() 736 if err != nil { 737 return nil, err 738 } 739 } 740 return bs, nil 741} 742