1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17// Package schema provides types and functions for manipulating and building parquet
18// file schemas.
19//
20// Some of the utilities provided include building a schema using Struct Tags
21// on a struct type, getting Column Paths from a node, and dealing with the
22// converted and logical types for Parquet.
23//
24// Logical types specify ways to interpret the primitive types allowing the
25// number of primitive types to be smaller and reuse efficient encodings.
26// For instance a "string" is just a ByteArray column with a UTF-8 annotation
27// or "String Logical Type".
28//
29// For more information about Logical and Converted Types, check:
30// https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
31package schema
32
33import (
34	"fmt"
35	"io"
36	"strings"
37
38	"github.com/apache/arrow/go/v6/parquet"
39	format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet"
40	"golang.org/x/xerrors"
41)
42
43// Schema is the container for the converted Parquet schema with a computed
44// information from the schema analysis needed for file reading
45//
46// * Column index to Node
47//
48// * Max repetition / definition levels for each primitive node
49//
50// The ColumnDescriptor objects produced by this class can be used to assist in
51// the reconstruction of fully materialized data structures from the
52// repetition-definition level encoding of nested data
53type Schema struct {
54	root Node
55
56	leaves      []*Column
57	nodeToLeaf  map[*PrimitiveNode]int
58	leafToBase  map[int]Node
59	leafToIndex strIntMultimap
60}
61
62// FromParquet converts a slice of thrift Schema Elements to the correct node type
63func FromParquet(elems []*format.SchemaElement) (Node, error) {
64	if len(elems) == 0 {
65		return nil, xerrors.New("parquet: empty schema (no root)")
66	}
67
68	if elems[0].GetNumChildren() == 0 {
69		if len(elems) > 1 {
70			return nil, xerrors.New("parquet: schema had multiple nodes but root had no children")
71		}
72		// parquet file with no columns
73		return GroupNodeFromThrift(elems[0], []Node{})
74	}
75
76	// We don't check that the root node is repeated since this is not
77	// consistently set by implementations
78	var (
79		pos      = 0
80		nextNode func() (Node, error)
81	)
82
83	nextNode = func() (Node, error) {
84		if pos == len(elems) {
85			return nil, xerrors.New("parquet: malformed schema: not enough elements")
86		}
87
88		elem := elems[pos]
89		pos++
90
91		if elem.GetNumChildren() == 0 {
92			return PrimitiveNodeFromThrift(elem)
93		}
94
95		fields := make([]Node, 0, elem.GetNumChildren())
96		for i := 0; i < int(elem.GetNumChildren()); i++ {
97			n, err := nextNode()
98			if err != nil {
99				return nil, err
100			}
101			fields = append(fields, n)
102		}
103
104		return GroupNodeFromThrift(elem, fields)
105	}
106
107	return nextNode()
108}
109
110// Root returns the group node that is the root of this schema
111func (s *Schema) Root() *GroupNode {
112	return s.root.(*GroupNode)
113}
114
115// NumColumns returns the number of leaf nodes that are the actual primitive
116// columns in this schema.
117func (s *Schema) NumColumns() int {
118	return len(s.leaves)
119}
120
121// Equals returns true as long as the leaf columns are equal, doesn't take
122// into account the groups and only checks whether the schemas are compatible
123// at the physical storage level.
124func (s *Schema) Equals(rhs *Schema) bool {
125	if s.NumColumns() != rhs.NumColumns() {
126		return false
127	}
128
129	for idx, c := range s.leaves {
130		if !c.Equals(rhs.Column(idx)) {
131			return false
132		}
133	}
134	return true
135}
136
137func (s *Schema) buildTree(n Node, maxDefLvl, maxRepLvl int16, base Node) {
138	switch n.RepetitionType() {
139	case parquet.Repetitions.Repeated:
140		maxRepLvl++
141		fallthrough
142	case parquet.Repetitions.Optional:
143		maxDefLvl++
144	}
145
146	switch n := n.(type) {
147	case *GroupNode:
148		for _, f := range n.fields {
149			s.buildTree(f, maxDefLvl, maxRepLvl, base)
150		}
151	case *PrimitiveNode:
152		s.nodeToLeaf[n] = len(s.leaves)
153		s.leaves = append(s.leaves, NewColumn(n, maxDefLvl, maxRepLvl))
154		s.leafToBase[len(s.leaves)-1] = base
155		s.leafToIndex.Add(n.Path(), len(s.leaves)-1)
156	}
157}
158
159// Column returns the (0-indexed) column of the provided index.
160func (s *Schema) Column(i int) *Column {
161	return s.leaves[i]
162}
163
164// ColumnIndexByName looks up the column by it's full dot separated
165// node path. If there are multiple columns that match, it returns the first one.
166//
167// Returns -1 if not found.
168func (s *Schema) ColumnIndexByName(nodePath string) int {
169	if search, ok := s.leafToIndex[nodePath]; ok {
170		return search[0]
171	}
172	return -1
173}
174
175// ColumnIndexByNode returns the index of the column represented by this node.
176//
177// Returns -1 if not found.
178func (s *Schema) ColumnIndexByNode(n Node) int {
179	if search, ok := s.leafToIndex[n.Path()]; ok {
180		for _, idx := range search {
181			if n == s.Column(idx).SchemaNode() {
182				return idx
183			}
184		}
185	}
186	return -1
187}
188
189// ColumnRoot returns the root node of a given column if it is under a
190// nested group node, providing that root group node.
191func (s *Schema) ColumnRoot(i int) Node {
192	return s.leafToBase[i]
193}
194
195// HasRepeatedFields returns true if any node in the schema has a repeated field type.
196func (s *Schema) HasRepeatedFields() bool {
197	return s.root.(*GroupNode).HasRepeatedFields()
198}
199
200// UpdateColumnOrders must get a slice that is the same length as the number of leaf columns
201// and is used to update the schema metadata Column Orders. len(orders) must equal s.NumColumns()
202func (s *Schema) UpdateColumnOrders(orders []parquet.ColumnOrder) error {
203	if len(orders) != s.NumColumns() {
204		return xerrors.New("parquet: malformed schema: not enough ColumnOrder values")
205	}
206
207	visitor := schemaColumnOrderUpdater{orders, 0}
208	s.root.Visit(&visitor)
209	return nil
210}
211
212// NewSchema constructs a new Schema object from a root group node.
213//
214// Any fields with a field-id of -1 will be given an appropriate field number based on their order.
215func NewSchema(root *GroupNode) *Schema {
216	s := &Schema{
217		root,
218		make([]*Column, 0),
219		make(map[*PrimitiveNode]int),
220		make(map[int]Node),
221		make(strIntMultimap),
222	}
223
224	for _, f := range root.fields {
225		s.buildTree(f, 0, 0, f)
226	}
227	return s
228}
229
230type schemaColumnOrderUpdater struct {
231	colOrders []parquet.ColumnOrder
232	leafCount int
233}
234
235func (s *schemaColumnOrderUpdater) VisitPre(n Node) bool {
236	if n.Type() == Primitive {
237		leaf := n.(*PrimitiveNode)
238		leaf.ColumnOrder = s.colOrders[s.leafCount]
239		s.leafCount++
240	}
241	return true
242}
243
244func (s *schemaColumnOrderUpdater) VisitPost(Node) {}
245
246type toThriftVisitor struct {
247	elements []*format.SchemaElement
248}
249
250func (t *toThriftVisitor) VisitPre(n Node) bool {
251	t.elements = append(t.elements, n.toThrift())
252	return true
253}
254
255func (t *toThriftVisitor) VisitPost(Node) {}
256
257// ToThrift converts a GroupNode to a slice of SchemaElements which is used
258// for thrift serialization.
259func ToThrift(schema *GroupNode) []*format.SchemaElement {
260	t := &toThriftVisitor{make([]*format.SchemaElement, 0)}
261	schema.Visit(t)
262	return t.elements
263}
264
265type schemaPrinter struct {
266	w           io.Writer
267	indent      int
268	indentWidth int
269}
270
271func (s *schemaPrinter) VisitPre(n Node) bool {
272	fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
273	if n.Type() == Group {
274		g := n.(*GroupNode)
275		fmt.Fprintf(s.w, "%s group field_id=%d %s", g.RepetitionType(), g.FieldID(), g.Name())
276		_, invalid := g.logicalType.(UnknownLogicalType)
277		_, none := g.logicalType.(NoLogicalType)
278
279		if g.logicalType != nil && !invalid && !none {
280			fmt.Fprintf(s.w, " (%s)", g.logicalType)
281		} else if g.convertedType != ConvertedTypes.None {
282			fmt.Fprintf(s.w, " (%s)", g.convertedType)
283		}
284
285		fmt.Fprintln(s.w, " {")
286		s.indent += s.indentWidth
287	} else {
288		p := n.(*PrimitiveNode)
289		fmt.Fprintf(s.w, "%s %s field_id=%d %s", p.RepetitionType(), strings.ToLower(p.PhysicalType().String()), p.FieldID(), p.Name())
290		_, invalid := p.logicalType.(UnknownLogicalType)
291		_, none := p.logicalType.(NoLogicalType)
292
293		if p.logicalType != nil && !invalid && !none {
294			fmt.Fprintf(s.w, " (%s)", p.logicalType)
295		} else if p.convertedType == ConvertedTypes.Decimal {
296			fmt.Fprintf(s.w, " (%s(%d,%d))", p.convertedType, p.DecimalMetadata().Precision, p.DecimalMetadata().Scale)
297		} else if p.convertedType != ConvertedTypes.None {
298			fmt.Fprintf(s.w, " (%s)", p.convertedType)
299		}
300		fmt.Fprintln(s.w, ";")
301	}
302	return true
303}
304
305func (s *schemaPrinter) VisitPost(n Node) {
306	if n.Type() == Group {
307		s.indent -= s.indentWidth
308		fmt.Fprint(s.w, strings.Repeat(" ", s.indent))
309		fmt.Fprintln(s.w, "}")
310	}
311}
312
313// PrintSchema writes a string representation of the tree to w using the indent
314// width provided.
315func PrintSchema(n Node, w io.Writer, indentWidth int) {
316	n.Visit(&schemaPrinter{w, 0, indentWidth})
317}
318
319type strIntMultimap map[string][]int
320
321func (f strIntMultimap) Add(key string, val int) bool {
322	if _, ok := f[key]; !ok {
323		f[key] = []int{val}
324		return false
325	}
326	f[key] = append(f[key], val)
327	return true
328}
329