1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package ipc // import "github.com/apache/arrow/go/arrow/ipc"
18
19import (
20	"encoding/binary"
21	"io"
22	"sort"
23
24	"github.com/apache/arrow/go/arrow"
25	"github.com/apache/arrow/go/arrow/internal/flatbuf"
26	"github.com/apache/arrow/go/arrow/memory"
27	flatbuffers "github.com/google/flatbuffers/go"
28	"golang.org/x/xerrors"
29)
30
31// Magic string identifying an Apache Arrow file.
32var Magic = []byte("ARROW1")
33
34const (
35	currentMetadataVersion = MetadataV4
36	minMetadataVersion     = MetadataV4
37
38	kExtensionTypeKeyName = "arrow_extension_name"
39	kExtensionDataKeyName = "arrow_extension_data"
40
41	// ARROW-109: We set this number arbitrarily to help catch user mistakes. For
42	// deeply nested schemas, it is expected the user will indicate explicitly the
43	// maximum allowed recursion depth
44	kMaxNestingDepth = 64
45)
46
47type startVecFunc func(b *flatbuffers.Builder, n int) flatbuffers.UOffsetT
48
49type fieldMetadata struct {
50	Len    int64
51	Nulls  int64
52	Offset int64
53}
54
55type bufferMetadata struct {
56	Offset int64 // relative offset into the memory page to the starting byte of the buffer
57	Len    int64 // absolute length in bytes of the buffer
58}
59
60type fileBlock struct {
61	Offset int64
62	Meta   int32
63	Body   int64
64
65	r io.ReaderAt
66}
67
68func fileBlocksToFB(b *flatbuffers.Builder, blocks []fileBlock, start startVecFunc) flatbuffers.UOffsetT {
69	start(b, len(blocks))
70	for i := len(blocks) - 1; i >= 0; i-- {
71		blk := blocks[i]
72		flatbuf.CreateBlock(b, blk.Offset, blk.Meta, blk.Body)
73	}
74
75	return b.EndVector(len(blocks))
76}
77
78func (blk fileBlock) NewMessage() (*Message, error) {
79	var (
80		err error
81		buf []byte
82		r   = blk.section()
83	)
84
85	buf = make([]byte, blk.Meta)
86	_, err = io.ReadFull(r, buf)
87	if err != nil {
88		return nil, xerrors.Errorf("arrow/ipc: could not read message metadata: %w", err)
89	}
90
91	prefix := 0
92	switch binary.LittleEndian.Uint32(buf) {
93	case 0:
94	case kIPCContToken:
95		prefix = 8
96	default:
97		// ARROW-6314: backwards compatibility for reading old IPC
98		// messages produced prior to version 0.15.0
99		prefix = 4
100	}
101
102	meta := memory.NewBufferBytes(buf[prefix:]) // drop buf-size already known from blk.Meta
103
104	buf = make([]byte, blk.Body)
105	_, err = io.ReadFull(r, buf)
106	if err != nil {
107		return nil, xerrors.Errorf("arrow/ipc: could not read message body: %w", err)
108	}
109	body := memory.NewBufferBytes(buf)
110
111	return NewMessage(meta, body), nil
112}
113
114func (blk fileBlock) section() io.Reader {
115	return io.NewSectionReader(blk.r, blk.Offset, int64(blk.Meta)+blk.Body)
116}
117
118func unitFromFB(unit flatbuf.TimeUnit) arrow.TimeUnit {
119	switch unit {
120	case flatbuf.TimeUnitSECOND:
121		return arrow.Second
122	case flatbuf.TimeUnitMILLISECOND:
123		return arrow.Millisecond
124	case flatbuf.TimeUnitMICROSECOND:
125		return arrow.Microsecond
126	case flatbuf.TimeUnitNANOSECOND:
127		return arrow.Nanosecond
128	default:
129		panic(xerrors.Errorf("arrow/ipc: invalid flatbuf.TimeUnit(%d) value", unit))
130	}
131}
132
133func unitToFB(unit arrow.TimeUnit) flatbuf.TimeUnit {
134	switch unit {
135	case arrow.Second:
136		return flatbuf.TimeUnitSECOND
137	case arrow.Millisecond:
138		return flatbuf.TimeUnitMILLISECOND
139	case arrow.Microsecond:
140		return flatbuf.TimeUnitMICROSECOND
141	case arrow.Nanosecond:
142		return flatbuf.TimeUnitNANOSECOND
143	default:
144		panic(xerrors.Errorf("arrow/ipc: invalid arrow.TimeUnit(%d) value", unit))
145	}
146}
147
148// initFB is a helper function to handle flatbuffers' polymorphism.
149func initFB(t interface {
150	Table() flatbuffers.Table
151	Init([]byte, flatbuffers.UOffsetT)
152}, f func(tbl *flatbuffers.Table) bool) {
153	tbl := t.Table()
154	if !f(&tbl) {
155		panic(xerrors.Errorf("arrow/ipc: could not initialize %T from flatbuffer", t))
156	}
157	t.Init(tbl.Bytes, tbl.Pos)
158}
159
160func fieldFromFB(field *flatbuf.Field, memo *dictMemo) (arrow.Field, error) {
161	var (
162		err error
163		o   arrow.Field
164	)
165
166	o.Name = string(field.Name())
167	o.Nullable = field.Nullable()
168	o.Metadata, err = metadataFromFB(field)
169	if err != nil {
170		return o, err
171	}
172
173	encoding := field.Dictionary(nil)
174	switch encoding {
175	case nil:
176		n := field.ChildrenLength()
177		children := make([]arrow.Field, n)
178		for i := range children {
179			var childFB flatbuf.Field
180			if !field.Children(&childFB, i) {
181				return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i)
182			}
183			child, err := fieldFromFB(&childFB, memo)
184			if err != nil {
185				return o, xerrors.Errorf("arrow/ipc: could not convert field child %d: %w", i, err)
186			}
187			children[i] = child
188		}
189
190		o.Type, err = typeFromFB(field, children, o.Metadata)
191		if err != nil {
192			return o, xerrors.Errorf("arrow/ipc: could not convert field type: %w", err)
193		}
194	default:
195		panic("not implemented") // FIXME(sbinet)
196	}
197
198	return o, nil
199}
200
201func fieldToFB(b *flatbuffers.Builder, field arrow.Field, memo *dictMemo) flatbuffers.UOffsetT {
202	var visitor = fieldVisitor{b: b, memo: memo, meta: make(map[string]string)}
203	return visitor.result(field)
204}
205
206type fieldVisitor struct {
207	b      *flatbuffers.Builder
208	memo   *dictMemo
209	dtype  flatbuf.Type
210	offset flatbuffers.UOffsetT
211	kids   []flatbuffers.UOffsetT
212	meta   map[string]string
213}
214
215func (fv *fieldVisitor) visit(field arrow.Field) {
216	dt := field.Type
217	switch dt := dt.(type) {
218	case *arrow.NullType:
219		fv.dtype = flatbuf.TypeNull
220		flatbuf.NullStart(fv.b)
221		fv.offset = flatbuf.NullEnd(fv.b)
222
223	case *arrow.BooleanType:
224		fv.dtype = flatbuf.TypeBool
225		flatbuf.BoolStart(fv.b)
226		fv.offset = flatbuf.BoolEnd(fv.b)
227
228	case *arrow.Uint8Type:
229		fv.dtype = flatbuf.TypeInt
230		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
231
232	case *arrow.Uint16Type:
233		fv.dtype = flatbuf.TypeInt
234		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
235
236	case *arrow.Uint32Type:
237		fv.dtype = flatbuf.TypeInt
238		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
239
240	case *arrow.Uint64Type:
241		fv.dtype = flatbuf.TypeInt
242		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), false)
243
244	case *arrow.Int8Type:
245		fv.dtype = flatbuf.TypeInt
246		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
247
248	case *arrow.Int16Type:
249		fv.dtype = flatbuf.TypeInt
250		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
251
252	case *arrow.Int32Type:
253		fv.dtype = flatbuf.TypeInt
254		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
255
256	case *arrow.Int64Type:
257		fv.dtype = flatbuf.TypeInt
258		fv.offset = intToFB(fv.b, int32(dt.BitWidth()), true)
259
260	case *arrow.Float16Type:
261		fv.dtype = flatbuf.TypeFloatingPoint
262		fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
263
264	case *arrow.Float32Type:
265		fv.dtype = flatbuf.TypeFloatingPoint
266		fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
267
268	case *arrow.Float64Type:
269		fv.dtype = flatbuf.TypeFloatingPoint
270		fv.offset = floatToFB(fv.b, int32(dt.BitWidth()))
271
272	case *arrow.Decimal128Type:
273		fv.dtype = flatbuf.TypeDecimal
274		flatbuf.DecimalStart(fv.b)
275		flatbuf.DecimalAddPrecision(fv.b, dt.Precision)
276		flatbuf.DecimalAddScale(fv.b, dt.Scale)
277		fv.offset = flatbuf.DecimalEnd(fv.b)
278
279	case *arrow.FixedSizeBinaryType:
280		fv.dtype = flatbuf.TypeFixedSizeBinary
281		flatbuf.FixedSizeBinaryStart(fv.b)
282		flatbuf.FixedSizeBinaryAddByteWidth(fv.b, int32(dt.ByteWidth))
283		fv.offset = flatbuf.FixedSizeBinaryEnd(fv.b)
284
285	case *arrow.BinaryType:
286		fv.dtype = flatbuf.TypeBinary
287		flatbuf.BinaryStart(fv.b)
288		fv.offset = flatbuf.BinaryEnd(fv.b)
289
290	case *arrow.StringType:
291		fv.dtype = flatbuf.TypeUtf8
292		flatbuf.Utf8Start(fv.b)
293		fv.offset = flatbuf.Utf8End(fv.b)
294
295	case *arrow.Date32Type:
296		fv.dtype = flatbuf.TypeDate
297		flatbuf.DateStart(fv.b)
298		flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitDAY)
299		fv.offset = flatbuf.DateEnd(fv.b)
300
301	case *arrow.Date64Type:
302		fv.dtype = flatbuf.TypeDate
303		flatbuf.DateStart(fv.b)
304		flatbuf.DateAddUnit(fv.b, flatbuf.DateUnitMILLISECOND)
305		fv.offset = flatbuf.DateEnd(fv.b)
306
307	case *arrow.Time32Type:
308		fv.dtype = flatbuf.TypeTime
309		flatbuf.TimeStart(fv.b)
310		flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit))
311		flatbuf.TimeAddBitWidth(fv.b, 32)
312		fv.offset = flatbuf.TimeEnd(fv.b)
313
314	case *arrow.Time64Type:
315		fv.dtype = flatbuf.TypeTime
316		flatbuf.TimeStart(fv.b)
317		flatbuf.TimeAddUnit(fv.b, unitToFB(dt.Unit))
318		flatbuf.TimeAddBitWidth(fv.b, 64)
319		fv.offset = flatbuf.TimeEnd(fv.b)
320
321	case *arrow.TimestampType:
322		fv.dtype = flatbuf.TypeTimestamp
323		unit := unitToFB(dt.Unit)
324		var tz flatbuffers.UOffsetT
325		if dt.TimeZone != "" {
326			tz = fv.b.CreateString(dt.TimeZone)
327		}
328		flatbuf.TimestampStart(fv.b)
329		flatbuf.TimestampAddUnit(fv.b, unit)
330		flatbuf.TimestampAddTimezone(fv.b, tz)
331		fv.offset = flatbuf.TimestampEnd(fv.b)
332
333	case *arrow.StructType:
334		fv.dtype = flatbuf.TypeStruct_
335		offsets := make([]flatbuffers.UOffsetT, len(dt.Fields()))
336		for i, field := range dt.Fields() {
337			offsets[i] = fieldToFB(fv.b, field, fv.memo)
338		}
339		flatbuf.Struct_Start(fv.b)
340		for i := len(offsets) - 1; i >= 0; i-- {
341			fv.b.PrependUOffsetT(offsets[i])
342		}
343		fv.offset = flatbuf.Struct_End(fv.b)
344		fv.kids = append(fv.kids, offsets...)
345
346	case *arrow.ListType:
347		fv.dtype = flatbuf.TypeList
348		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
349		flatbuf.ListStart(fv.b)
350		fv.offset = flatbuf.ListEnd(fv.b)
351
352	case *arrow.FixedSizeListType:
353		fv.dtype = flatbuf.TypeFixedSizeList
354		fv.kids = append(fv.kids, fieldToFB(fv.b, arrow.Field{Name: "item", Type: dt.Elem(), Nullable: field.Nullable}, fv.memo))
355		flatbuf.FixedSizeListStart(fv.b)
356		flatbuf.FixedSizeListAddListSize(fv.b, dt.Len())
357		fv.offset = flatbuf.FixedSizeListEnd(fv.b)
358
359	case *arrow.MonthIntervalType:
360		fv.dtype = flatbuf.TypeInterval
361		flatbuf.IntervalStart(fv.b)
362		flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitYEAR_MONTH)
363		fv.offset = flatbuf.IntervalEnd(fv.b)
364
365	case *arrow.DayTimeIntervalType:
366		fv.dtype = flatbuf.TypeInterval
367		flatbuf.IntervalStart(fv.b)
368		flatbuf.IntervalAddUnit(fv.b, flatbuf.IntervalUnitDAY_TIME)
369		fv.offset = flatbuf.IntervalEnd(fv.b)
370
371	case *arrow.DurationType:
372		fv.dtype = flatbuf.TypeDuration
373		unit := unitToFB(dt.Unit)
374		flatbuf.DurationStart(fv.b)
375		flatbuf.DurationAddUnit(fv.b, unit)
376		fv.offset = flatbuf.DurationEnd(fv.b)
377
378	default:
379		err := xerrors.Errorf("arrow/ipc: invalid data type %v", dt)
380		panic(err) // FIXME(sbinet): implement all data-types.
381	}
382}
383
384func (fv *fieldVisitor) result(field arrow.Field) flatbuffers.UOffsetT {
385	nameFB := fv.b.CreateString(field.Name)
386
387	fv.visit(field)
388
389	flatbuf.FieldStartChildrenVector(fv.b, len(fv.kids))
390	for i := len(fv.kids) - 1; i >= 0; i-- {
391		fv.b.PrependUOffsetT(fv.kids[i])
392	}
393	kidsFB := fv.b.EndVector(len(fv.kids))
394
395	var dictFB flatbuffers.UOffsetT
396	if field.Type.ID() == arrow.DICTIONARY {
397		panic("not implemented") // FIXME(sbinet)
398	}
399
400	var (
401		metaFB flatbuffers.UOffsetT
402		kvs    []flatbuffers.UOffsetT
403	)
404	for i, k := range field.Metadata.Keys() {
405		v := field.Metadata.Values()[i]
406		kk := fv.b.CreateString(k)
407		vv := fv.b.CreateString(v)
408		flatbuf.KeyValueStart(fv.b)
409		flatbuf.KeyValueAddKey(fv.b, kk)
410		flatbuf.KeyValueAddValue(fv.b, vv)
411		kvs = append(kvs, flatbuf.KeyValueEnd(fv.b))
412	}
413	{
414		keys := make([]string, 0, len(fv.meta))
415		for k := range fv.meta {
416			keys = append(keys, k)
417		}
418		sort.Strings(keys)
419		for _, k := range keys {
420			v := fv.meta[k]
421			kk := fv.b.CreateString(k)
422			vv := fv.b.CreateString(v)
423			flatbuf.KeyValueStart(fv.b)
424			flatbuf.KeyValueAddKey(fv.b, kk)
425			flatbuf.KeyValueAddValue(fv.b, vv)
426			kvs = append(kvs, flatbuf.KeyValueEnd(fv.b))
427		}
428	}
429	if len(kvs) > 0 {
430		flatbuf.FieldStartCustomMetadataVector(fv.b, len(kvs))
431		for i := len(kvs) - 1; i >= 0; i-- {
432			fv.b.PrependUOffsetT(kvs[i])
433		}
434		metaFB = fv.b.EndVector(len(kvs))
435	}
436
437	flatbuf.FieldStart(fv.b)
438	flatbuf.FieldAddName(fv.b, nameFB)
439	flatbuf.FieldAddNullable(fv.b, field.Nullable)
440	flatbuf.FieldAddTypeType(fv.b, fv.dtype)
441	flatbuf.FieldAddType(fv.b, fv.offset)
442	flatbuf.FieldAddDictionary(fv.b, dictFB)
443	flatbuf.FieldAddChildren(fv.b, kidsFB)
444	flatbuf.FieldAddCustomMetadata(fv.b, metaFB)
445
446	offset := flatbuf.FieldEnd(fv.b)
447
448	return offset
449}
450
451func fieldFromFBDict(field *flatbuf.Field) (arrow.Field, error) {
452	var (
453		o = arrow.Field{
454			Name:     string(field.Name()),
455			Nullable: field.Nullable(),
456		}
457		err  error
458		memo = newMemo()
459	)
460
461	// any DictionaryEncoding set is ignored here.
462
463	kids := make([]arrow.Field, field.ChildrenLength())
464	for i := range kids {
465		var kid flatbuf.Field
466		if !field.Children(&kid, i) {
467			return o, xerrors.Errorf("arrow/ipc: could not load field child %d", i)
468		}
469		kids[i], err = fieldFromFB(&kid, &memo)
470		if err != nil {
471			return o, xerrors.Errorf("arrow/ipc: field from dict: %w", err)
472		}
473	}
474
475	meta, err := metadataFromFB(field)
476	if err != nil {
477		return o, xerrors.Errorf("arrow/ipc: metadata for field from dict: %w", err)
478	}
479
480	o.Type, err = typeFromFB(field, kids, meta)
481	if err != nil {
482		return o, xerrors.Errorf("arrow/ipc: type for field from dict: %w", err)
483	}
484
485	return o, nil
486}
487
488func typeFromFB(field *flatbuf.Field, children []arrow.Field, md arrow.Metadata) (arrow.DataType, error) {
489	var data flatbuffers.Table
490	if !field.Type(&data) {
491		return nil, xerrors.Errorf("arrow/ipc: could not load field type data")
492	}
493
494	dt, err := concreteTypeFromFB(field.TypeType(), data, children)
495	if err != nil {
496		return dt, err
497	}
498
499	// look for extension metadata in custom metadata field.
500	if md.Len() > 0 {
501		i := md.FindKey(kExtensionTypeKeyName)
502		if i < 0 {
503			return dt, err
504		}
505
506		panic("not implemented") // FIXME(sbinet)
507	}
508
509	return dt, err
510}
511
512func concreteTypeFromFB(typ flatbuf.Type, data flatbuffers.Table, children []arrow.Field) (arrow.DataType, error) {
513	var (
514		dt  arrow.DataType
515		err error
516	)
517
518	switch typ {
519	case flatbuf.TypeNONE:
520		return nil, xerrors.Errorf("arrow/ipc: Type metadata cannot be none")
521
522	case flatbuf.TypeNull:
523		return arrow.Null, nil
524
525	case flatbuf.TypeInt:
526		var dt flatbuf.Int
527		dt.Init(data.Bytes, data.Pos)
528		return intFromFB(dt)
529
530	case flatbuf.TypeFloatingPoint:
531		var dt flatbuf.FloatingPoint
532		dt.Init(data.Bytes, data.Pos)
533		return floatFromFB(dt)
534
535	case flatbuf.TypeDecimal:
536		var dt flatbuf.Decimal
537		dt.Init(data.Bytes, data.Pos)
538		return decimalFromFB(dt)
539
540	case flatbuf.TypeBinary:
541		return arrow.BinaryTypes.Binary, nil
542
543	case flatbuf.TypeFixedSizeBinary:
544		var dt flatbuf.FixedSizeBinary
545		dt.Init(data.Bytes, data.Pos)
546		return &arrow.FixedSizeBinaryType{ByteWidth: int(dt.ByteWidth())}, nil
547
548	case flatbuf.TypeUtf8:
549		return arrow.BinaryTypes.String, nil
550
551	case flatbuf.TypeBool:
552		return arrow.FixedWidthTypes.Boolean, nil
553
554	case flatbuf.TypeList:
555		if len(children) != 1 {
556			return nil, xerrors.Errorf("arrow/ipc: List must have exactly 1 child field (got=%d)", len(children))
557		}
558		return arrow.ListOf(children[0].Type), nil
559
560	case flatbuf.TypeFixedSizeList:
561		var dt flatbuf.FixedSizeList
562		dt.Init(data.Bytes, data.Pos)
563		if len(children) != 1 {
564			return nil, xerrors.Errorf("arrow/ipc: FixedSizeList must have exactly 1 child field (got=%d)", len(children))
565		}
566		return arrow.FixedSizeListOf(dt.ListSize(), children[0].Type), nil
567
568	case flatbuf.TypeStruct_:
569		return arrow.StructOf(children...), nil
570
571	case flatbuf.TypeTime:
572		var dt flatbuf.Time
573		dt.Init(data.Bytes, data.Pos)
574		return timeFromFB(dt)
575
576	case flatbuf.TypeTimestamp:
577		var dt flatbuf.Timestamp
578		dt.Init(data.Bytes, data.Pos)
579		return timestampFromFB(dt)
580
581	case flatbuf.TypeDate:
582		var dt flatbuf.Date
583		dt.Init(data.Bytes, data.Pos)
584		return dateFromFB(dt)
585
586	case flatbuf.TypeInterval:
587		var dt flatbuf.Interval
588		dt.Init(data.Bytes, data.Pos)
589		return intervalFromFB(dt)
590
591	case flatbuf.TypeDuration:
592		var dt flatbuf.Duration
593		dt.Init(data.Bytes, data.Pos)
594		return durationFromFB(dt)
595
596	default:
597		// FIXME(sbinet): implement all the other types.
598		panic(xerrors.Errorf("arrow/ipc: type %v not implemented", flatbuf.EnumNamesType[typ]))
599	}
600
601	return dt, err
602}
603
604func intFromFB(data flatbuf.Int) (arrow.DataType, error) {
605	bw := data.BitWidth()
606	if bw > 64 {
607		return nil, xerrors.Errorf("arrow/ipc: integers with more than 64 bits not implemented (bits=%d)", bw)
608	}
609	if bw < 8 {
610		return nil, xerrors.Errorf("arrow/ipc: integers with less than 8 bits not implemented (bits=%d)", bw)
611	}
612
613	switch bw {
614	case 8:
615		if !data.IsSigned() {
616			return arrow.PrimitiveTypes.Uint8, nil
617		}
618		return arrow.PrimitiveTypes.Int8, nil
619
620	case 16:
621		if !data.IsSigned() {
622			return arrow.PrimitiveTypes.Uint16, nil
623		}
624		return arrow.PrimitiveTypes.Int16, nil
625
626	case 32:
627		if !data.IsSigned() {
628			return arrow.PrimitiveTypes.Uint32, nil
629		}
630		return arrow.PrimitiveTypes.Int32, nil
631
632	case 64:
633		if !data.IsSigned() {
634			return arrow.PrimitiveTypes.Uint64, nil
635		}
636		return arrow.PrimitiveTypes.Int64, nil
637	default:
638		return nil, xerrors.Errorf("arrow/ipc: integers not in cstdint are not implemented")
639	}
640}
641
642func intToFB(b *flatbuffers.Builder, bw int32, isSigned bool) flatbuffers.UOffsetT {
643	flatbuf.IntStart(b)
644	flatbuf.IntAddBitWidth(b, bw)
645	flatbuf.IntAddIsSigned(b, isSigned)
646	return flatbuf.IntEnd(b)
647}
648
649func floatFromFB(data flatbuf.FloatingPoint) (arrow.DataType, error) {
650	switch p := data.Precision(); p {
651	case flatbuf.PrecisionHALF:
652		return arrow.FixedWidthTypes.Float16, nil
653	case flatbuf.PrecisionSINGLE:
654		return arrow.PrimitiveTypes.Float32, nil
655	case flatbuf.PrecisionDOUBLE:
656		return arrow.PrimitiveTypes.Float64, nil
657	default:
658		return nil, xerrors.Errorf("arrow/ipc: floating point type with %d precision not implemented", p)
659	}
660}
661
662func floatToFB(b *flatbuffers.Builder, bw int32) flatbuffers.UOffsetT {
663	switch bw {
664	case 16:
665		flatbuf.FloatingPointStart(b)
666		flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionHALF)
667		return flatbuf.FloatingPointEnd(b)
668	case 32:
669		flatbuf.FloatingPointStart(b)
670		flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionSINGLE)
671		return flatbuf.FloatingPointEnd(b)
672	case 64:
673		flatbuf.FloatingPointStart(b)
674		flatbuf.FloatingPointAddPrecision(b, flatbuf.PrecisionDOUBLE)
675		return flatbuf.FloatingPointEnd(b)
676	default:
677		panic(xerrors.Errorf("arrow/ipc: invalid floating point precision %d-bits", bw))
678	}
679}
680
681func decimalFromFB(data flatbuf.Decimal) (arrow.DataType, error) {
682	return &arrow.Decimal128Type{Precision: data.Precision(), Scale: data.Scale()}, nil
683}
684
685func timeFromFB(data flatbuf.Time) (arrow.DataType, error) {
686	bw := data.BitWidth()
687	unit := unitFromFB(data.Unit())
688
689	switch bw {
690	case 32:
691		switch unit {
692		case arrow.Millisecond:
693			return arrow.FixedWidthTypes.Time32ms, nil
694		case arrow.Second:
695			return arrow.FixedWidthTypes.Time32s, nil
696		default:
697			return nil, xerrors.Errorf("arrow/ipc: Time32 type with %v unit not implemented", unit)
698		}
699	case 64:
700		switch unit {
701		case arrow.Nanosecond:
702			return arrow.FixedWidthTypes.Time64ns, nil
703		case arrow.Microsecond:
704			return arrow.FixedWidthTypes.Time64us, nil
705		default:
706			return nil, xerrors.Errorf("arrow/ipc: Time64 type with %v unit not implemented", unit)
707		}
708	default:
709		return nil, xerrors.Errorf("arrow/ipc: Time type with %d bitwidth not implemented", bw)
710	}
711}
712
713func timestampFromFB(data flatbuf.Timestamp) (arrow.DataType, error) {
714	unit := unitFromFB(data.Unit())
715	tz := string(data.Timezone())
716	return &arrow.TimestampType{Unit: unit, TimeZone: tz}, nil
717}
718
719func dateFromFB(data flatbuf.Date) (arrow.DataType, error) {
720	switch data.Unit() {
721	case flatbuf.DateUnitDAY:
722		return arrow.FixedWidthTypes.Date32, nil
723	case flatbuf.DateUnitMILLISECOND:
724		return arrow.FixedWidthTypes.Date64, nil
725	}
726	return nil, xerrors.Errorf("arrow/ipc: Date type with %d unit not implemented", data.Unit())
727}
728
729func intervalFromFB(data flatbuf.Interval) (arrow.DataType, error) {
730	switch data.Unit() {
731	case flatbuf.IntervalUnitYEAR_MONTH:
732		return arrow.FixedWidthTypes.MonthInterval, nil
733	case flatbuf.IntervalUnitDAY_TIME:
734		return arrow.FixedWidthTypes.DayTimeInterval, nil
735	}
736	return nil, xerrors.Errorf("arrow/ipc: Interval type with %d unit not implemented", data.Unit())
737}
738
739func durationFromFB(data flatbuf.Duration) (arrow.DataType, error) {
740	switch data.Unit() {
741	case flatbuf.TimeUnitSECOND:
742		return arrow.FixedWidthTypes.Duration_s, nil
743	case flatbuf.TimeUnitMILLISECOND:
744		return arrow.FixedWidthTypes.Duration_ms, nil
745	case flatbuf.TimeUnitMICROSECOND:
746		return arrow.FixedWidthTypes.Duration_us, nil
747	case flatbuf.TimeUnitNANOSECOND:
748		return arrow.FixedWidthTypes.Duration_ns, nil
749	}
750	return nil, xerrors.Errorf("arrow/ipc: Duration type with %d unit not implemented", data.Unit())
751}
752
753type customMetadataer interface {
754	CustomMetadataLength() int
755	CustomMetadata(*flatbuf.KeyValue, int) bool
756}
757
758func metadataFromFB(md customMetadataer) (arrow.Metadata, error) {
759	var (
760		keys = make([]string, md.CustomMetadataLength())
761		vals = make([]string, md.CustomMetadataLength())
762	)
763
764	for i := range keys {
765		var kv flatbuf.KeyValue
766		if !md.CustomMetadata(&kv, i) {
767			return arrow.Metadata{}, xerrors.Errorf("arrow/ipc: could not read key-value %d from flatbuffer", i)
768		}
769		keys[i] = string(kv.Key())
770		vals[i] = string(kv.Value())
771	}
772
773	return arrow.NewMetadata(keys, vals), nil
774}
775
776func metadataToFB(b *flatbuffers.Builder, meta arrow.Metadata, start startVecFunc) flatbuffers.UOffsetT {
777	if meta.Len() == 0 {
778		return 0
779	}
780
781	n := meta.Len()
782	kvs := make([]flatbuffers.UOffsetT, n)
783	for i := range kvs {
784		k := b.CreateString(meta.Keys()[i])
785		v := b.CreateString(meta.Values()[i])
786		flatbuf.KeyValueStart(b)
787		flatbuf.KeyValueAddKey(b, k)
788		flatbuf.KeyValueAddValue(b, v)
789		kvs[i] = flatbuf.KeyValueEnd(b)
790	}
791
792	start(b, n)
793	for i := n - 1; i >= 0; i-- {
794		b.PrependUOffsetT(kvs[i])
795	}
796	return b.EndVector(n)
797}
798
799func schemaFromFB(schema *flatbuf.Schema, memo *dictMemo) (*arrow.Schema, error) {
800	var (
801		err    error
802		fields = make([]arrow.Field, schema.FieldsLength())
803	)
804
805	for i := range fields {
806		var field flatbuf.Field
807		if !schema.Fields(&field, i) {
808			return nil, xerrors.Errorf("arrow/ipc: could not read field %d from schema", i)
809		}
810
811		fields[i], err = fieldFromFB(&field, memo)
812		if err != nil {
813			return nil, xerrors.Errorf("arrow/ipc: could not convert field %d from flatbuf: %w", i, err)
814		}
815	}
816
817	md, err := metadataFromFB(schema)
818	if err != nil {
819		return nil, xerrors.Errorf("arrow/ipc: could not convert schema metadata from flatbuf: %w", err)
820	}
821
822	return arrow.NewSchema(fields, &md), nil
823}
824
825func schemaToFB(b *flatbuffers.Builder, schema *arrow.Schema, memo *dictMemo) flatbuffers.UOffsetT {
826	fields := make([]flatbuffers.UOffsetT, len(schema.Fields()))
827	for i, field := range schema.Fields() {
828		fields[i] = fieldToFB(b, field, memo)
829	}
830
831	flatbuf.SchemaStartFieldsVector(b, len(fields))
832	for i := len(fields) - 1; i >= 0; i-- {
833		b.PrependUOffsetT(fields[i])
834	}
835	fieldsFB := b.EndVector(len(fields))
836
837	metaFB := metadataToFB(b, schema.Metadata(), flatbuf.SchemaStartCustomMetadataVector)
838
839	flatbuf.SchemaStart(b)
840	flatbuf.SchemaAddEndianness(b, flatbuf.EndiannessLittle)
841	flatbuf.SchemaAddFields(b, fieldsFB)
842	flatbuf.SchemaAddCustomMetadata(b, metaFB)
843	offset := flatbuf.SchemaEnd(b)
844
845	return offset
846}
847
848func dictTypesFromFB(schema *flatbuf.Schema) (dictTypeMap, error) {
849	var (
850		err    error
851		fields = make(dictTypeMap, schema.FieldsLength())
852	)
853	for i := 0; i < schema.FieldsLength(); i++ {
854		var field flatbuf.Field
855		if !schema.Fields(&field, i) {
856			return nil, xerrors.Errorf("arrow/ipc: could not load field %d from schema", i)
857		}
858		fields, err = visitField(&field, fields)
859		if err != nil {
860			return nil, xerrors.Errorf("arrow/ipc: could not visit field %d from schema: %w", i, err)
861		}
862	}
863	return fields, err
864}
865
866func visitField(field *flatbuf.Field, dict dictTypeMap) (dictTypeMap, error) {
867	var err error
868	meta := field.Dictionary(nil)
869	switch meta {
870	case nil:
871		// field is not dictionary encoded.
872		// => visit children.
873		for i := 0; i < field.ChildrenLength(); i++ {
874			var child flatbuf.Field
875			if !field.Children(&child, i) {
876				return nil, xerrors.Errorf("arrow/ipc: could not visit child %d from field", i)
877			}
878			dict, err = visitField(&child, dict)
879			if err != nil {
880				return nil, err
881			}
882		}
883	default:
884		// field is dictionary encoded.
885		// construct the data type for the dictionary: no descendants can be dict-encoded.
886		dfield, err := fieldFromFBDict(field)
887		if err != nil {
888			return nil, xerrors.Errorf("arrow/ipc: could not create data type for dictionary: %w", err)
889		}
890		dict[meta.Id()] = dfield
891	}
892	return dict, err
893}
894
895// payloadsFromSchema returns a slice of payloads corresponding to the given schema.
896// Callers of payloadsFromSchema will need to call Release after use.
897func payloadsFromSchema(schema *arrow.Schema, mem memory.Allocator, memo *dictMemo) payloads {
898	dict := newMemo()
899
900	ps := make(payloads, 1, dict.Len()+1)
901	ps[0].msg = MessageSchema
902	ps[0].meta = writeSchemaMessage(schema, mem, &dict)
903
904	// append dictionaries.
905	if dict.Len() > 0 {
906		panic("payloads-from-schema: not-implemented")
907		//		for id, arr := range dict.id2dict {
908		//			// GetSchemaPayloads: writer.cc:535
909		//		}
910	}
911
912	if memo != nil {
913		*memo = dict
914	}
915
916	return ps
917}
918
919func writeFBBuilder(b *flatbuffers.Builder, mem memory.Allocator) *memory.Buffer {
920	raw := b.FinishedBytes()
921	buf := memory.NewResizableBuffer(mem)
922	buf.Resize(len(raw))
923	copy(buf.Bytes(), raw)
924	return buf
925}
926
927func writeMessageFB(b *flatbuffers.Builder, mem memory.Allocator, hdrType flatbuf.MessageHeader, hdr flatbuffers.UOffsetT, bodyLen int64) *memory.Buffer {
928
929	flatbuf.MessageStart(b)
930	flatbuf.MessageAddVersion(b, int16(currentMetadataVersion))
931	flatbuf.MessageAddHeaderType(b, hdrType)
932	flatbuf.MessageAddHeader(b, hdr)
933	flatbuf.MessageAddBodyLength(b, bodyLen)
934	msg := flatbuf.MessageEnd(b)
935	b.Finish(msg)
936
937	return writeFBBuilder(b, mem)
938}
939
940func writeSchemaMessage(schema *arrow.Schema, mem memory.Allocator, dict *dictMemo) *memory.Buffer {
941	b := flatbuffers.NewBuilder(1024)
942	schemaFB := schemaToFB(b, schema, dict)
943	return writeMessageFB(b, mem, flatbuf.MessageHeaderSchema, schemaFB, 0)
944}
945
946func writeFileFooter(schema *arrow.Schema, dicts, recs []fileBlock, w io.Writer) error {
947	var (
948		b    = flatbuffers.NewBuilder(1024)
949		memo = newMemo()
950	)
951
952	schemaFB := schemaToFB(b, schema, &memo)
953	dictsFB := fileBlocksToFB(b, dicts, flatbuf.FooterStartDictionariesVector)
954	recsFB := fileBlocksToFB(b, recs, flatbuf.FooterStartRecordBatchesVector)
955
956	flatbuf.FooterStart(b)
957	flatbuf.FooterAddVersion(b, int16(currentMetadataVersion))
958	flatbuf.FooterAddSchema(b, schemaFB)
959	flatbuf.FooterAddDictionaries(b, dictsFB)
960	flatbuf.FooterAddRecordBatches(b, recsFB)
961	footer := flatbuf.FooterEnd(b)
962
963	b.Finish(footer)
964
965	_, err := w.Write(b.FinishedBytes())
966	return err
967}
968
969func writeRecordMessage(mem memory.Allocator, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) *memory.Buffer {
970	b := flatbuffers.NewBuilder(0)
971	recFB := recordToFB(b, size, bodyLength, fields, meta)
972	return writeMessageFB(b, mem, flatbuf.MessageHeaderRecordBatch, recFB, bodyLength)
973}
974
975func recordToFB(b *flatbuffers.Builder, size, bodyLength int64, fields []fieldMetadata, meta []bufferMetadata) flatbuffers.UOffsetT {
976	fieldsFB := writeFieldNodes(b, fields, flatbuf.RecordBatchStartNodesVector)
977	metaFB := writeBuffers(b, meta, flatbuf.RecordBatchStartBuffersVector)
978
979	flatbuf.RecordBatchStart(b)
980	flatbuf.RecordBatchAddLength(b, size)
981	flatbuf.RecordBatchAddNodes(b, fieldsFB)
982	flatbuf.RecordBatchAddBuffers(b, metaFB)
983	return flatbuf.RecordBatchEnd(b)
984}
985
986func writeFieldNodes(b *flatbuffers.Builder, fields []fieldMetadata, start startVecFunc) flatbuffers.UOffsetT {
987
988	start(b, len(fields))
989	for i := len(fields) - 1; i >= 0; i-- {
990		field := fields[i]
991		if field.Offset != 0 {
992			panic(xerrors.Errorf("arrow/ipc: field metadata for IPC must have offset 0"))
993		}
994		flatbuf.CreateFieldNode(b, field.Len, field.Nulls)
995	}
996
997	return b.EndVector(len(fields))
998}
999
1000func writeBuffers(b *flatbuffers.Builder, buffers []bufferMetadata, start startVecFunc) flatbuffers.UOffsetT {
1001	start(b, len(buffers))
1002	for i := len(buffers) - 1; i >= 0; i-- {
1003		buffer := buffers[i]
1004		flatbuf.CreateBuffer(b, buffer.Offset, buffer.Len)
1005	}
1006	return b.EndVector(len(buffers))
1007}
1008
1009func writeMessage(msg *memory.Buffer, alignment int32, w io.Writer) (int, error) {
1010	var (
1011		n   int
1012		err error
1013	)
1014
1015	// ARROW-3212: we do not make any assumption on whether the output stream is aligned or not.
1016	paddedMsgLen := int32(msg.Len()) + 8
1017	remainder := paddedMsgLen % alignment
1018	if remainder != 0 {
1019		paddedMsgLen += alignment - remainder
1020	}
1021
1022	tmp := make([]byte, 4)
1023
1024	// write continuation indicator, to address 8-byte alignment requirement from FlatBuffers.
1025	binary.LittleEndian.PutUint32(tmp, kIPCContToken)
1026	_, err = w.Write(tmp)
1027	if err != nil {
1028		return 0, xerrors.Errorf("arrow/ipc: could not write continuation bit indicator: %w", err)
1029	}
1030
1031	// the returned message size includes the length prefix, the flatbuffer, + padding
1032	n = int(paddedMsgLen)
1033
1034	// write the flatbuffer size prefix, including padding
1035	sizeFB := paddedMsgLen - 8
1036	binary.LittleEndian.PutUint32(tmp, uint32(sizeFB))
1037	_, err = w.Write(tmp)
1038	if err != nil {
1039		return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer size prefix: %w", err)
1040	}
1041
1042	// write the flatbuffer
1043	_, err = w.Write(msg.Bytes())
1044	if err != nil {
1045		return n, xerrors.Errorf("arrow/ipc: could not write message flatbuffer: %w", err)
1046	}
1047
1048	// write any padding
1049	padding := paddedMsgLen - int32(msg.Len()) - 8
1050	if padding > 0 {
1051		_, err = w.Write(paddingBytes[:padding])
1052		if err != nil {
1053			return n, xerrors.Errorf("arrow/ipc: could not write message padding bytes: %w", err)
1054		}
1055	}
1056
1057	return n, err
1058}
1059