1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package ipc // import "github.com/apache/arrow/go/arrow/ipc" 18 19import ( 20 "encoding/binary" 21 "io" 22 "sort" 23 24 "github.com/apache/arrow/go/arrow" 25 "github.com/apache/arrow/go/arrow/internal/flatbuf" 26 "github.com/apache/arrow/go/arrow/memory" 27 flatbuffers "github.com/google/flatbuffers/go" 28 "golang.org/x/xerrors" 29) 30 31// Magic string identifying an Apache Arrow file. 32var Magic = []byte("ARROW1") 33 34const ( 35 currentMetadataVersion = MetadataV4 36 minMetadataVersion = MetadataV4 37 38 kExtensionTypeKeyName = "arrow_extension_name" 39 kExtensionDataKeyName = "arrow_extension_data" 40 41 // ARROW-109: We set this number arbitrarily to help catch user mistakes. For 42 // deeply nested schemas, it is expected the user will indicate explicitly the 43 // maximum allowed recursion depth 44 kMaxNestingDepth = 64 45) 46 47type startVecFunc func(b *flatbuffers.Builder, n int) flatbuffers.UOffsetT 48 49type fieldMetadata struct { 50 Len int64 51 Nulls int64 52 Offset int64 53} 54 55type bufferMetadata struct { 56 Offset int64 // relative offset into the memory page to the starting byte of the buffer 57 Len int64 // absolute length in bytes of the buffer 58} 59 60type fileBlock struct { 61 Offset int64 62 Meta int32 63 Body int64 64 65 r io.ReaderAt 66} 67 68func fileBlocksToFB(b *flatbuffers.Builder, blocks []fileBlock, start startVecFunc) flatbuffers.UOffsetT { 69 start(b, len(blocks)) 70 for i := len(blocks) - 1; i >= 0; i-- { 71 blk := blocks[i] 72 flatbuf.CreateBlock(b, blk.Offset, blk.Meta, blk.Body) 73 } 74 75 return b.EndVector(len(blocks)) 76} 77 78func (blk fileBlock) NewMessage() (*Message, error) { 79 var ( 80 err error 81 buf []byte 82 r = blk.section() 83 ) 84 85 buf = make([]byte, blk.Meta) 86 _, err = io.ReadFull(r, buf) 87 if err != nil { 88 return nil, xerrors.Errorf("arrow/ipc: could not read message metadata: %w", err) 89 } 90 91 prefix := 0 92 switch binary.LittleEndian.Uint32(buf) { 93 case 0: 94 case kIPCContToken: 95 prefix = 8 96 default: 97 // ARROW-6314: backwards compatibility for reading old IPC 98 // messages produced prior to version 0.15.0 99 prefix = 4 100 } 101 102 meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta 103 104 buf = make([]byte, blk.Body) 105 _, err = io.ReadFull(r, buf) 106 if err != nil { 107 return nil, xerrors.Errorf("arrow/ipc: could not read message body: %w", err) 108 } 109 body := memory.NewBufferBytes(buf) 110 111 return NewMessage(meta, body), nil 112} 113 114func (blk fileBlock) section() io.Reader { 115 return io.NewSectionReader(blk.r, blk.Offset, int64(blk.Meta)+blk.Body) 116} 117 118func unitFromFB(unit flatbuf.TimeUnit) arrow.TimeUnit { 119 switch unit { 120 case flatbuf.TimeUnitSECOND: 121 return arrow.Second 122 case flatbuf.TimeUnitMILLISECOND: 123 return arrow.Millisecond 124 case flatbuf.TimeUnitMICROSECOND: 125 return arrow.Microsecond 126 case flatbuf.TimeUnitNANOSECOND: 127 return arrow.Nanosecond 128 default: 129 panic(xerrors.Errorf("arrow/ipc: invalid flatbuf.TimeUnit(%d) value", unit)) 130 } 131} 132 133func unitToFB(unit arrow.TimeUnit) flatbuf.TimeUnit { 134 switch unit { 135 case arrow.Second: 136 return flatbuf.TimeUnitSECOND 137 case arrow.Millisecond: 138 return flatbuf.TimeUnitMILLISECOND 139 case arrow.Microsecond: 140 return flatbuf.TimeUnitMICROSECOND 141 case arrow.Nanosecond: 142 return flatbuf.TimeUnitNANOSECOND 143 default: 144 panic(xerrors.Errorf("arrow/ipc: invalid arrow.TimeUnit(%d) value", unit)) 145 } 146} 147 148// initFB is a helper function to handle flatbuffers' polymorphism. 149func initFB(t interface { 150 Table() flatbuffers.Table 151 Init([]byte, flatbuffers.UOffsetT) 152}, f func(tbl *flatbuffers.Table) bool) { 153 tbl := t.Table() 154 if !f(&tbl) { 155 panic(xerrors.Errorf("arrow/ipc: could not initialize %T from flatbuffer", t)) 156 } 157 t.Init(tbl.Bytes, tbl.Pos) 158} 159 160func fieldFromFB(field *flatbuf.Field, memo *dictMemo) (arrow.Field, error) { 161 var ( 162 err error 163 o arrow.Field 164 ) 165 166 o.Name = string(field.Name()) 167 o.Nullable = field.Nullable() 168 o.Metadata, err = metadataFromFB(field) 169 if err != nil { 170 return o, err 171 } 172 173 encoding := field.Dictionary(nil) 174 switch encoding { 175 case nil: 176 n := field.ChildrenLength() 177 children := make([]arrow.Field, n) 178 for i := range children { 179 var childFB flatbuf.Field 180 if !field.Children(&childFB, i) { 181 return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i) 182 } 183 child, err := fieldFromFB(&childFB, memo) 184 if err != nil { 185 return o, xerrors.Errorf("arrow/ipc: could not convert field child %d: %w", i, err) 186 } 187 children[i] = child 188 } 189 190 o.Type, err = typeFromFB(field, children, o.Metadata) 191 if err != nil { 192 return o, xerrors.Errorf("arrow/ipc: could not convert field type: %w", err) 193 } 194 default: 195 panic("not implemented") // FIXME(sbinet) 196 } 197 198 return o, nil 199} 200 201func fieldToFB(b *flatbuffers.Builder, field arrow.Field, memo *dictMemo) flatbuffers.UOffsetT { 202 var visitor = fieldVisitor{b: b, memo: memo, meta: make(map[string]string)} 203 return visitor.result(field) 204} 205 206type fieldVisitor struct { 207 b *flatbuffers.Builder 208 memo *dictMemo 209 dtype flatbuf.Type 210 offset flatbuffers.UOffsetT 211 kids []flatbuffers.UOffsetT 212 meta map[string]string 213} 214 215func (fv *fieldVisitor) visit(field arrow.Field) { 216 dt := field.Type 217 switch dt := dt.(type) { 218 case *arrow.NullType: 219 fv.dtype = flatbuf.TypeNull 220 flatbuf.NullStart(fv.b) 221 fv.offset = flatbuf.NullEnd(fv.b) 222 223 case *arrow.BooleanType: 224 fv.dtype = flatbuf.TypeBool 225 flatbuf.BoolStart(fv.b) 226 fv.offset = flatbuf.BoolEnd(fv.b) 227 228 case *arrow.Uint8Type: 229 fv.dtype = flatbuf.TypeInt 230 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false) 231 232 case *arrow.Uint16Type: 233 fv.dtype = flatbuf.TypeInt 234 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false) 235 236 case *arrow.Uint32Type: 237 fv.dtype = flatbuf.TypeInt 238 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false) 239 240 case *arrow.Uint64Type: 241 fv.dtype = flatbuf.TypeInt 242 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false) 243 244 case *arrow.Int8Type: 245 fv.dtype = flatbuf.TypeInt 246 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true) 247 248 case *arrow.Int16Type: 249 fv.dtype = flatbuf.TypeInt 250 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true) 251 252 case *arrow.Int32Type: 253 fv.dtype = flatbuf.TypeInt 254 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true) 255 256 case *arrow.Int64Type: 257 fv.dtype = flatbuf.TypeInt 258 fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true) 259 260 case *arrow.Float16Type: 261 fv.dtype = flatbuf.TypeFloatingPoint 262 fv.offset = floatToFB(fv.b, int32(dt.BitWidth())) 263 264 case *arrow.Float32Type: 265 fv.dtype = flatbuf.TypeFloatingPoint 266 fv.offset = floatToFB(fv.b, int32(dt.BitWidth())) 267 268 case *arrow.Float64Type: 269 fv.dtype = flatbuf.TypeFloatingPoint 270 fv.offset = floatToFB(fv.b, int32(dt.BitWidth())) 271 272 case *arrow.Decimal128Type: 273 fv.dtype = flatbuf.TypeDecimal 274 flatbuf.DecimalStart(fv.b) 275 flatbuf.DecimalAddPrecision(fv.b, dt.Precision) 276 flatbuf.DecimalAddScale(fv.b, dt.Scale) 277 fv.offset = flatbuf.DecimalEnd(fv.b) 278 279 case *arrow.FixedSizeBinaryType: 280 fv.dtype = flatbuf.TypeFixedSizeBinary 281 flatbuf.FixedSizeBinaryStart(fv.b) 282 flatbuf.FixedSizeBinaryAddByteWidth(fv.b, int32(dt.ByteWidth)) 283 fv.offset = flatbuf.FixedSizeBinaryEnd(fv.b) 284 285 case *arrow.BinaryType: 286 fv.dtype = flatbuf.TypeBinary 287 flatbuf.BinaryStart(fv.b) 288 fv.offset = flatbuf.BinaryEnd(fv.b) 289 290 case *arrow.StringType: 291 fv.dtype = flatbuf.TypeUtf8 292 flatbuf.Utf8Start(fv.b) 293 fv.offset = flatbuf.Utf8End(fv.b) 294 295 case *arrow.Date32Type: 296 fv.dtype = flatbuf.TypeDate 297 flatbuf.DateStart(fv.b) 298 flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitDAY) 299 fv.offset = flatbuf.DateEnd(fv.b) 300 301 case *arrow.Date64Type: 302 fv.dtype = flatbuf.TypeDate 303 flatbuf.DateStart(fv.b) 304 flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitMILLISECOND) 305 fv.offset = flatbuf.DateEnd(fv.b) 306 307 case *arrow.Time32Type: 308 fv.dtype = flatbuf.TypeTime 309 flatbuf.TimeStart(fv.b) 310 flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit)) 311 flatbuf.TimeAddBitWidth(fv.b, 32) 312 fv.offset = flatbuf.TimeEnd(fv.b) 313 314 case *arrow.Time64Type: 315 fv.dtype = flatbuf.TypeTime 316 flatbuf.TimeStart(fv.b) 317 flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit)) 318 flatbuf.TimeAddBitWidth(fv.b, 64) 319 fv.offset = flatbuf.TimeEnd(fv.b) 320 321 case *arrow.TimestampType: 322 fv.dtype = flatbuf.TypeTimestamp 323 unit := unitToFB(dt.Unit) 324 var tz flatbuffers.UOffsetT 325 if dt.TimeZone != "" { 326 tz = fv.b.CreateString(dt.TimeZone) 327 } 328 flatbuf.TimestampStart(fv.b) 329 flatbuf.TimestampAddUnit(fv.b, unit) 330 flatbuf.TimestampAddTimezone(fv.b, tz) 331 fv.offset = flatbuf.TimestampEnd(fv.b) 332 333 case *arrow.StructType: 334 fv.dtype = flatbuf.TypeStruct_ 335 offsets := make([]flatbuffers.UOffsetT, len(dt.Fields())) 336 for i, field := range dt.Fields() { 337 offsets[i] = fieldToFB(fv.b, field, fv.memo) 338 } 339 flatbuf.Struct_Start(fv.b) 340 for i := len(offsets) - 1; i >= 0; i-- { 341 fv.b.PrependUOffsetT(offsets[i]) 342 } 343 fv.offset = flatbuf.Struct_End(fv.b) 344 fv.kids = append(fv.kids, offsets...) 345 346 case *arrow.ListType: 347 fv.dtype = flatbuf.TypeList 348 fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo)) 349 flatbuf.ListStart(fv.b) 350 fv.offset = flatbuf.ListEnd(fv.b) 351 352 case *arrow.FixedSizeListType: 353 fv.dtype = flatbuf.TypeFixedSizeList 354 fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo)) 355 flatbuf.FixedSizeListStart(fv.b) 356 flatbuf.FixedSizeListAddListSize(fv.b, dt.Len()) 357 fv.offset = flatbuf.FixedSizeListEnd(fv.b) 358 359 case *arrow.MonthIntervalType: 360 fv.dtype = flatbuf.TypeInterval 361 flatbuf.IntervalStart(fv.b) 362 flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH) 363 fv.offset = flatbuf.IntervalEnd(fv.b) 364 365 case *arrow.DayTimeIntervalType: 366 fv.dtype = flatbuf.TypeInterval 367 flatbuf.IntervalStart(fv.b) 368 flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME) 369 fv.offset = flatbuf.IntervalEnd(fv.b) 370 371 case *arrow.DurationType: 372 fv.dtype = flatbuf.TypeDuration 373 unit := unitToFB(dt.Unit) 374 flatbuf.DurationStart(fv.b) 375 flatbuf.DurationAddUnit(fv.b, unit) 376 fv.offset = flatbuf.DurationEnd(fv.b) 377 378 default: 379 err := xerrors.Errorf("arrow/ipc: invalid data type %v", dt) 380 panic(err) // FIXME(sbinet): implement all data-types. 381 } 382} 383 384func (fv *fieldVisitor) result(field arrow.Field) flatbuffers.UOffsetT { 385 nameFB := fv.b.CreateString(field.Name) 386 387 fv.visit(field) 388 389 flatbuf.FieldStartChildrenVector(fv.b, len(fv.kids)) 390 for i := len(fv.kids) - 1; i >= 0; i-- { 391 fv.b.PrependUOffsetT(fv.kids[i]) 392 } 393 kidsFB := fv.b.EndVector(len(fv.kids)) 394 395 var dictFB flatbuffers.UOffsetT 396 if field.Type.ID() == arrow.DICTIONARY { 397 panic("not implemented") // FIXME(sbinet) 398 } 399 400 var ( 401 metaFB flatbuffers.UOffsetT 402 kvs []flatbuffers.UOffsetT 403 ) 404 for i, k := range field.Metadata.Keys() { 405 v := field.Metadata.Values()[i] 406 kk := fv.b.CreateString(k) 407 vv := fv.b.CreateString(v) 408 flatbuf.KeyValueStart(fv.b) 409 flatbuf.KeyValueAddKey(fv.b, kk) 410 flatbuf.KeyValueAddValue(fv.b, vv) 411 kvs = append(kvs, flatbuf.KeyValueEnd(fv.b)) 412 } 413 { 414 keys := make([]string, 0, len(fv.meta)) 415 for k := range fv.meta { 416 keys = append(keys, k) 417 } 418 sort.Strings(keys) 419 for _, k := range keys { 420 v := fv.meta[k] 421 kk := fv.b.CreateString(k) 422 vv := fv.b.CreateString(v) 423 flatbuf.KeyValueStart(fv.b) 424 flatbuf.KeyValueAddKey(fv.b, kk) 425 flatbuf.KeyValueAddValue(fv.b, vv) 426 kvs = append(kvs, flatbuf.KeyValueEnd(fv.b)) 427 } 428 } 429 if len(kvs) > 0 { 430 flatbuf.FieldStartCustomMetadataVector(fv.b, len(kvs)) 431 for i := len(kvs) - 1; i >= 0; i-- { 432 fv.b.PrependUOffsetT(kvs[i]) 433 } 434 metaFB = fv.b.EndVector(len(kvs)) 435 } 436 437 flatbuf.FieldStart(fv.b) 438 flatbuf.FieldAddName(fv.b, nameFB) 439 flatbuf.FieldAddNullable(fv.b, field.Nullable) 440 flatbuf.FieldAddTypeType(fv.b, fv.dtype) 441 flatbuf.FieldAddType(fv.b, fv.offset) 442 flatbuf.FieldAddDictionary(fv.b, dictFB) 443 flatbuf.FieldAddChildren(fv.b, kidsFB) 444 flatbuf.FieldAddCustomMetadata(fv.b, metaFB) 445 446 offset := flatbuf.FieldEnd(fv.b) 447 448 return offset 449} 450 451func fieldFromFBDict(field *flatbuf.Field) (arrow.Field, error) { 452 var ( 453 o = arrow.Field{ 454 Name: string(field.Name()), 455 Nullable: field.Nullable(), 456 } 457 err error 458 memo = newMemo() 459 ) 460 461 // any DictionaryEncoding set is ignored here. 462 463 kids := make([]arrow.Field, field.ChildrenLength()) 464 for i := range kids { 465 var kid flatbuf.Field 466 if !field.Children(&kid, i) { 467 return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i) 468 } 469 kids[i], err = fieldFromFB(&kid, &memo) 470 if err != nil { 471 return o, xerrors.Errorf("arrow/ipc: field from dict: %w", err) 472 } 473 } 474 475 meta, err := metadataFromFB(field) 476 if err != nil { 477 return o, xerrors.Errorf("arrow/ipc: metadata for field from dict: %w", err) 478 } 479 480 o.Type, err = typeFromFB(field, kids, meta) 481 if err != nil { 482 return o, xerrors.Errorf("arrow/ipc: type for field from dict: %w", err) 483 } 484 485 return o, nil 486} 487 488func typeFromFB(field *flatbuf.Field, children []arrow.Field, md arrow.Metadata) (arrow.DataType, error) { 489 var data flatbuffers.Table 490 if !field.Type(&data) { 491 return nil, xerrors.Errorf("arrow/ipc: could not load field type data") 492 } 493 494 dt, err := concreteTypeFromFB(field.TypeType(), data, children) 495 if err != nil { 496 return dt, err 497 } 498 499 // look for extension metadata in custom metadata field. 500 if md.Len() > 0 { 501 i := md.FindKey(kExtensionTypeKeyName) 502 if i < 0 { 503 return dt, err 504 } 505 506 panic("not implemented") // FIXME(sbinet) 507 } 508 509 return dt, err 510} 511 512func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arrow.Field) (arrow.DataType, error) { 513 var ( 514 dt arrow.DataType 515 err error 516 ) 517 518 switch typ { 519 case flatbuf.TypeNONE: 520 return nil, xerrors.Errorf("arrow/ipc: Type metadata cannot be none") 521 522 case flatbuf.TypeNull: 523 return arrow.Null, nil 524 525 case flatbuf.TypeInt: 526 var dt flatbuf.Int 527 dt.Init(data.Bytes, data.Pos) 528 return intFromFB(dt) 529 530 case flatbuf.TypeFloatingPoint: 531 var dt flatbuf.FloatingPoint 532 dt.Init(data.Bytes, data.Pos) 533 return floatFromFB(dt) 534 535 case flatbuf.TypeDecimal: 536 var dt flatbuf.Decimal 537 dt.Init(data.Bytes, data.Pos) 538 return decimalFromFB(dt) 539 540 case flatbuf.TypeBinary: 541 return arrow.BinaryTypes.Binary, nil 542 543 case flatbuf.TypeFixedSizeBinary: 544 var dt flatbuf.FixedSizeBinary 545 dt.Init(data.Bytes, data.Pos) 546 return &arrow.FixedSizeBinaryType{ByteWidth: int(dt.ByteWidth())}, nil 547 548 case flatbuf.TypeUtf8: 549 return arrow.BinaryTypes.String, nil 550 551 case flatbuf.TypeBool: 552 return arrow.FixedWidthTypes.Boolean, nil 553 554 case flatbuf.TypeList: 555 if len(children) != 1 { 556 return nil, xerrors.Errorf("arrow/ipc: List must have exactly 1 child field (got=%d)", len(children)) 557 } 558 return arrow.ListOf(children[0].Type), nil 559 560 case flatbuf.TypeFixedSizeList: 561 var dt flatbuf.FixedSizeList 562 dt.Init(data.Bytes, data.Pos) 563 if len(children) != 1 { 564 return nil, xerrors.Errorf("arrow/ipc: FixedSizeList must have exactly 1 child field (got=%d)", len(children)) 565 } 566 return arrow.FixedSizeListOf(dt.ListSize(), children[0].Type), nil 567 568 case flatbuf.TypeStruct_: 569 return arrow.StructOf(children...), nil 570 571 case flatbuf.TypeTime: 572 var dt flatbuf.Time 573 dt.Init(data.Bytes, data.Pos) 574 return timeFromFB(dt) 575 576 case flatbuf.TypeTimestamp: 577 var dt flatbuf.Timestamp 578 dt.Init(data.Bytes, data.Pos) 579 return timestampFromFB(dt) 580 581 case flatbuf.TypeDate: 582 var dt flatbuf.Date 583 dt.Init(data.Bytes, data.Pos) 584 return dateFromFB(dt) 585 586 case flatbuf.TypeInterval: 587 var dt flatbuf.Interval 588 dt.Init(data.Bytes, data.Pos) 589 return intervalFromFB(dt) 590 591 case flatbuf.TypeDuration: 592 var dt flatbuf.Duration 593 dt.Init(data.Bytes, data.Pos) 594 return durationFromFB(dt) 595 596 default: 597 // FIXME(sbinet): implement all the other types. 598 panic(xerrors.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ])) 599 } 600 601 return dt, err 602} 603 604func intFromFB(data flatbuf.Int) (arrow.DataType, error) { 605 bw := data.BitWidth() 606 if bw > 64 { 607 return nil, xerrors.Errorf("arrow/ipc: integers with more than 64 bits not implemented (bits=%d)", bw) 608 } 609 if bw < 8 { 610 return nil, xerrors.Errorf("arrow/ipc: integers with less than 8 bits not implemented (bits=%d)", bw) 611 } 612 613 switch bw { 614 case 8: 615 if !data.IsSigned() { 616 return arrow.PrimitiveTypes.Uint8, nil 617 } 618 return arrow.PrimitiveTypes.Int8, nil 619 620 case 16: 621 if !data.IsSigned() { 622 return arrow.PrimitiveTypes.Uint16, nil 623 } 624 return arrow.PrimitiveTypes.Int16, nil 625 626 case 32: 627 if !data.IsSigned() { 628 return arrow.PrimitiveTypes.Uint32, nil 629 } 630 return arrow.PrimitiveTypes.Int32, nil 631 632 case 64: 633 if !data.IsSigned() { 634 return arrow.PrimitiveTypes.Uint64, nil 635 } 636 return arrow.PrimitiveTypes.Int64, nil 637 default: 638 return nil, xerrors.Errorf("arrow/ipc: integers not in cstdint are not implemented") 639 } 640} 641 642func intToFB(b *flatbuffers.Builder, bw int32, isSigned bool) flatbuffers.UOffsetT { 643 flatbuf.IntStart(b) 644 flatbuf.IntAddBitWidth(b, bw) 645 flatbuf.IntAddIsSigned(b, isSigned) 646 return flatbuf.IntEnd(b) 647} 648 649func floatFromFB(data flatbuf.FloatingPoint) (arrow.DataType, error) { 650 switch p := data.Precision(); p { 651 case flatbuf.PrecisionHALF: 652 return arrow.FixedWidthTypes.Float16, nil 653 case flatbuf.PrecisionSINGLE: 654 return arrow.PrimitiveTypes.Float32, nil 655 case flatbuf.PrecisionDOUBLE: 656 return arrow.PrimitiveTypes.Float64, nil 657 default: 658 return nil, xerrors.Errorf("arrow/ipc: floating point type with %d precision not implemented", p) 659 } 660} 661 662func floatToFB(b *flatbuffers.Builder, bw int32) flatbuffers.UOffsetT { 663 switch bw { 664 case 16: 665 flatbuf.FloatingPointStart(b) 666 flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionHALF) 667 return flatbuf.FloatingPointEnd(b) 668 case 32: 669 flatbuf.FloatingPointStart(b) 670 flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionSINGLE) 671 return flatbuf.FloatingPointEnd(b) 672 case 64: 673 flatbuf.FloatingPointStart(b) 674 flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionDOUBLE) 675 return flatbuf.FloatingPointEnd(b) 676 default: 677 panic(xerrors.Errorf("arrow/ipc: invalid floating point precision %d-bits", bw)) 678 } 679} 680 681func decimalFromFB(data flatbuf.Decimal) (arrow.DataType, error) { 682 return &arrow.Decimal128Type{Precision: data.Precision(), Scale: data.Scale()}, nil 683} 684 685func timeFromFB(data flatbuf.Time) (arrow.DataType, error) { 686 bw := data.BitWidth() 687 unit := unitFromFB(data.Unit()) 688 689 switch bw { 690 case 32: 691 switch unit { 692 case arrow.Millisecond: 693 return arrow.FixedWidthTypes.Time32ms, nil 694 case arrow.Second: 695 return arrow.FixedWidthTypes.Time32s, nil 696 default: 697 return nil, xerrors.Errorf("arrow/ipc: Time32 type with %v unit not implemented", unit) 698 } 699 case 64: 700 switch unit { 701 case arrow.Nanosecond: 702 return arrow.FixedWidthTypes.Time64ns, nil 703 case arrow.Microsecond: 704 return arrow.FixedWidthTypes.Time64us, nil 705 default: 706 return nil, xerrors.Errorf("arrow/ipc: Time64 type with %v unit not implemented", unit) 707 } 708 default: 709 return nil, xerrors.Errorf("arrow/ipc: Time type with %d bitwidth not implemented", bw) 710 } 711} 712 713func timestampFromFB(data flatbuf.Timestamp) (arrow.DataType, error) { 714 unit := unitFromFB(data.Unit()) 715 tz := string(data.Timezone()) 716 return &arrow.TimestampType{Unit: unit, TimeZone: tz}, nil 717} 718 719func dateFromFB(data flatbuf.Date) (arrow.DataType, error) { 720 switch data.Unit() { 721 case flatbuf.DateUnitDAY: 722 return arrow.FixedWidthTypes.Date32, nil 723 case flatbuf.DateUnitMILLISECOND: 724 return arrow.FixedWidthTypes.Date64, nil 725 } 726 return nil, xerrors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit()) 727} 728 729func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) { 730 switch data.Unit() { 731 case flatbuf.IntervalUnitYEAR_MONTH: 732 return arrow.FixedWidthTypes.MonthInterval, nil 733 case flatbuf.IntervalUnitDAY_TIME: 734 return arrow.FixedWidthTypes.DayTimeInterval, nil 735 } 736 return nil, xerrors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit()) 737} 738 739func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) { 740 switch data.Unit() { 741 case flatbuf.TimeUnitSECOND: 742 return arrow.FixedWidthTypes.Duration_s, nil 743 case flatbuf.TimeUnitMILLISECOND: 744 return arrow.FixedWidthTypes.Duration_ms, nil 745 case flatbuf.TimeUnitMICROSECOND: 746 return arrow.FixedWidthTypes.Duration_us, nil 747 case flatbuf.TimeUnitNANOSECOND: 748 return arrow.FixedWidthTypes.Duration_ns, nil 749 } 750 return nil, xerrors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit()) 751} 752 753type customMetadataer interface { 754 CustomMetadataLength() int 755 CustomMetadata(*flatbuf.KeyValue, int) bool 756} 757 758func metadataFromFB(md customMetadataer) (arrow.Metadata, error) { 759 var ( 760 keys = make([]string, md.CustomMetadataLength()) 761 vals = make([]string, md.CustomMetadataLength()) 762 ) 763 764 for i := range keys { 765 var kv flatbuf.KeyValue 766 if !md.CustomMetadata(&kv, i) { 767 return arrow.Metadata{}, xerrors.Errorf("arrow/ipc: could not read key-value %d from flatbuffer", i) 768 } 769 keys[i] = string(kv.Key()) 770 vals[i] = string(kv.Value()) 771 } 772 773 return arrow.NewMetadata(keys, vals), nil 774} 775 776func metadataToFB(b *flatbuffers.Builder, meta arrow.Metadata, start startVecFunc) flatbuffers.UOffsetT { 777 if meta.Len() == 0 { 778 return 0 779 } 780 781 n := meta.Len() 782 kvs := make([]flatbuffers.UOffsetT, n) 783 for i := range kvs { 784 k := b.CreateString(meta.Keys()[i]) 785 v := b.CreateString(meta.Values()[i]) 786 flatbuf.KeyValueStart(b) 787 flatbuf.KeyValueAddKey(b, k) 788 flatbuf.KeyValueAddValue(b, v) 789 kvs[i] = flatbuf.KeyValueEnd(b) 790 } 791 792 start(b, n) 793 for i := n - 1; i >= 0; i-- { 794 b.PrependUOffsetT(kvs[i]) 795 } 796 return b.EndVector(n) 797} 798 799func schemaFromFB(schema *flatbuf.Schema, memo *dictMemo) (*arrow.Schema, error) { 800 var ( 801 err error 802 fields = make([]arrow.Field, schema.FieldsLength()) 803 ) 804 805 for i := range fields { 806 var field flatbuf.Field 807 if !schema.Fields(&field, i) { 808 return nil, xerrors.Errorf("arrow/ipc: could not read field %d from schema", i) 809 } 810 811 fields[i], err = fieldFromFB(&field, memo) 812 if err != nil { 813 return nil, xerrors.Errorf("arrow/ipc: could not convert field %d from flatbuf: %w", i, err) 814 } 815 } 816 817 md, err := metadataFromFB(schema) 818 if err != nil { 819 return nil, xerrors.Errorf("arrow/ipc: could not convert schema metadata from flatbuf: %w", err) 820 } 821 822 return arrow.NewSchema(fields, &md), nil 823} 824 825func schemaToFB(b *flatbuffers.Builder, schema *arrow.Schema, memo *dictMemo) flatbuffers.UOffsetT { 826 fields := make([]flatbuffers.UOffsetT, len(schema.Fields())) 827 for i, field := range schema.Fields() { 828 fields[i] = fieldToFB(b, field, memo) 829 } 830 831 flatbuf.SchemaStartFieldsVector(b, len(fields)) 832 for i := len(fields) - 1; i >= 0; i-- { 833 b.PrependUOffsetT(fields[i]) 834 } 835 fieldsFB := b.EndVector(len(fields)) 836 837 metaFB := metadataToFB(b, schema.Metadata(), flatbuf.SchemaStartCustomMetadataVector) 838 839 flatbuf.SchemaStart(b) 840 flatbuf.SchemaAddEndianness(b, flatbuf.EndiannessLittle) 841 flatbuf.SchemaAddFields(b, fieldsFB) 842 flatbuf.SchemaAddCustomMetadata(b, metaFB) 843 offset := flatbuf.SchemaEnd(b) 844 845 return offset 846} 847 848func dictTypesFromFB(schema *flatbuf.Schema) (dictTypeMap, error) { 849 var ( 850 err error 851 fields = make(dictTypeMap, schema.FieldsLength()) 852 ) 853 for i := 0; i < schema.FieldsLength(); i++ { 854 var field flatbuf.Field 855 if !schema.Fields(&field, i) { 856 return nil, xerrors.Errorf("arrow/ipc: could not load field %d from schema", i) 857 } 858 fields, err = visitField(&field, fields) 859 if err != nil { 860 return nil, xerrors.Errorf("arrow/ipc: could not visit field %d from schema: %w", i, err) 861 } 862 } 863 return fields, err 864} 865 866func visitField(field *flatbuf.Field, dict dictTypeMap) (dictTypeMap, error) { 867 var err error 868 meta := field.Dictionary(nil) 869 switch meta { 870 case nil: 871 // field is not dictionary encoded. 872 // => visit children. 873 for i := 0; i < field.ChildrenLength(); i++ { 874 var child flatbuf.Field 875 if !field.Children(&child, i) { 876 return nil, xerrors.Errorf("arrow/ipc: could not visit child %d from field", i) 877 } 878 dict, err = visitField(&child, dict) 879 if err != nil { 880 return nil, err 881 } 882 } 883 default: 884 // field is dictionary encoded. 885 // construct the data type for the dictionary: no descendants can be dict-encoded. 886 dfield, err := fieldFromFBDict(field) 887 if err != nil { 888 return nil, xerrors.Errorf("arrow/ipc: could not create data type for dictionary: %w", err) 889 } 890 dict[meta.Id()] = dfield 891 } 892 return dict, err 893} 894 895// payloadsFromSchema returns a slice of payloads corresponding to the given schema. 896// Callers of payloadsFromSchema will need to call Release after use. 897func payloadsFromSchema(schema *arrow.Schema, mem memory.Allocator, memo *dictMemo) payloads { 898 dict := newMemo() 899 900 ps := make(payloads, 1, dict.Len()+1) 901 ps[0].msg = MessageSchema 902 ps[0].meta = writeSchemaMessage(schema, mem, &dict) 903 904 // append dictionaries. 905 if dict.Len() > 0 { 906 panic("payloads-from-schema: not-implemented") 907 // for id, arr := range dict.id2dict { 908 // // GetSchemaPayloads: writer.cc:535 909 // } 910 } 911 912 if memo != nil { 913 *memo = dict 914 } 915 916 return ps 917} 918 919func writeFBBuilder(b *flatbuffers.Builder, mem memory.Allocator) *memory.Buffer { 920 raw := b.FinishedBytes() 921 buf := memory.NewResizableBuffer(mem) 922 buf.Resize(len(raw)) 923 copy(buf.Bytes(), raw) 924 return buf 925} 926 927func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer { 928 929 flatbuf.MessageStart(b) 930 flatbuf.MessageAddVersion(b, int16(currentMetadataVersion)) 931 flatbuf.MessageAddHeaderType(b, hdrType) 932 flatbuf.MessageAddHeader(b, hdr) 933 flatbuf.MessageAddBodyLength(b, bodyLen) 934 msg := flatbuf.MessageEnd(b) 935 b.Finish(msg) 936 937 return writeFBBuilder(b, mem) 938} 939 940func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict *dictMemo) *memory.Buffer { 941 b := flatbuffers.NewBuilder(1024) 942 schemaFB := schemaToFB(b, schema, dict) 943 return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0) 944} 945 946func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) error { 947 var ( 948 b = flatbuffers.NewBuilder(1024) 949 memo = newMemo() 950 ) 951 952 schemaFB := schemaToFB(b, schema, &memo) 953 dictsFB := fileBlocksToFB(b, dicts, flatbuf.FooterStartDictionariesVector) 954 recsFB := fileBlocksToFB(b, recs, flatbuf.FooterStartRecordBatchesVector) 955 956 flatbuf.FooterStart(b) 957 flatbuf.FooterAddVersion(b, int16(currentMetadataVersion)) 958 flatbuf.FooterAddSchema(b, schemaFB) 959 flatbuf.FooterAddDictionaries(b, dictsFB) 960 flatbuf.FooterAddRecordBatches(b, recsFB) 961 footer := flatbuf.FooterEnd(b) 962 963 b.Finish(footer) 964 965 _, err := w.Write(b.FinishedBytes()) 966 return err 967} 968 969func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) *memory.Buffer { 970 b := flatbuffers.NewBuilder(0) 971 recFB := recordToFB(b, size, bodyLength, fields, meta) 972 return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength) 973} 974 975func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) flatbuffers.UOffsetT { 976 fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector) 977 metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector) 978 979 flatbuf.RecordBatchStart(b) 980 flatbuf.RecordBatchAddLength(b, size) 981 flatbuf.RecordBatchAddNodes(b, fieldsFB) 982 flatbuf.RecordBatchAddBuffers(b, metaFB) 983 return flatbuf.RecordBatchEnd(b) 984} 985 986func writeFieldNodes(b *flatbuffers.Builder, fields []fieldMetadata, start startVecFunc) flatbuffers.UOffsetT { 987 988 start(b, len(fields)) 989 for i := len(fields) - 1; i >= 0; i-- { 990 field := fields[i] 991 if field.Offset != 0 { 992 panic(xerrors.Errorf("arrow/ipc: field metadata for IPC must have offset 0")) 993 } 994 flatbuf.CreateFieldNode(b, field.Len, field.Nulls) 995 } 996 997 return b.EndVector(len(fields)) 998} 999 1000func writeBuffers(b *flatbuffers.Builder, buffers []bufferMetadata, start startVecFunc) flatbuffers.UOffsetT { 1001 start(b, len(buffers)) 1002 for i := len(buffers) - 1; i >= 0; i-- { 1003 buffer := buffers[i] 1004 flatbuf.CreateBuffer(b, buffer.Offset, buffer.Len) 1005 } 1006 return b.EndVector(len(buffers)) 1007} 1008 1009func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) { 1010 var ( 1011 n int 1012 err error 1013 ) 1014 1015 // ARROW-3212: we do not make any assumption on whether the output stream is aligned or not. 1016 paddedMsgLen := int32(msg.Len()) + 8 1017 remainder := paddedMsgLen % alignment 1018 if remainder != 0 { 1019 paddedMsgLen += alignment - remainder 1020 } 1021 1022 tmp := make([]byte, 4) 1023 1024 // write continuation indicator, to address 8-byte alignment requirement from FlatBuffers. 1025 binary.LittleEndian.PutUint32(tmp, kIPCContToken) 1026 _, err = w.Write(tmp) 1027 if err != nil { 1028 return 0, xerrors.Errorf("arrow/ipc: could not write continuation bit indicator: %w", err) 1029 } 1030 1031 // the returned message size includes the length prefix, the flatbuffer, + padding 1032 n = int(paddedMsgLen) 1033 1034 // write the flatbuffer size prefix, including padding 1035 sizeFB := paddedMsgLen - 8 1036 binary.LittleEndian.PutUint32(tmp, uint32(sizeFB)) 1037 _, err = w.Write(tmp) 1038 if err != nil { 1039 return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer size prefix: %w", err) 1040 } 1041 1042 // write the flatbuffer 1043 _, err = w.Write(msg.Bytes()) 1044 if err != nil { 1045 return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer: %w", err) 1046 } 1047 1048 // write any padding 1049 padding := paddedMsgLen - int32(msg.Len()) - 8 1050 if padding > 0 { 1051 _, err = w.Write(paddingBytes[:padding]) 1052 if err != nil { 1053 return n, xerrors.Errorf("arrow/ipc: could not write message padding bytes: %w", err) 1054 } 1055 } 1056 1057 return n, err 1058} 1059