1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package arrow
18
19import (
20	"fmt"
21	"sort"
22	"strings"
23)
24
25type Metadata struct {
26	keys   []string
27	values []string
28}
29
30func NewMetadata(keys, values []string) Metadata {
31	if len(keys) != len(values) {
32		panic("arrow: len mismatch")
33	}
34
35	n := len(keys)
36	if n == 0 {
37		return Metadata{}
38	}
39
40	md := Metadata{
41		keys:   make([]string, n),
42		values: make([]string, n),
43	}
44	copy(md.keys, keys)
45	copy(md.values, values)
46	return md
47}
48
49func MetadataFrom(kv map[string]string) Metadata {
50	md := Metadata{
51		keys:   make([]string, 0, len(kv)),
52		values: make([]string, 0, len(kv)),
53	}
54	for k := range kv {
55		md.keys = append(md.keys, k)
56	}
57	sort.Strings(md.keys)
58	for _, k := range md.keys {
59		md.values = append(md.values, kv[k])
60	}
61	return md
62}
63
64func (md Metadata) Len() int         { return len(md.keys) }
65func (md Metadata) Keys() []string   { return md.keys }
66func (md Metadata) Values() []string { return md.values }
67
68func (md Metadata) String() string {
69	o := new(strings.Builder)
70	fmt.Fprintf(o, "[")
71	for i := range md.keys {
72		if i > 0 {
73			fmt.Fprintf(o, ", ")
74		}
75		fmt.Fprintf(o, "%q: %q", md.keys[i], md.values[i])
76	}
77	fmt.Fprintf(o, "]")
78	return o.String()
79}
80
81// FindKey returns the index of the key-value pair with the provided key name,
82// or -1 if such a key does not exist.
83func (md Metadata) FindKey(k string) int {
84	for i, v := range md.keys {
85		if v == k {
86			return i
87		}
88	}
89	return -1
90}
91
92func (md Metadata) clone() Metadata {
93	if len(md.keys) == 0 {
94		return Metadata{}
95	}
96
97	o := Metadata{
98		keys:   make([]string, len(md.keys)),
99		values: make([]string, len(md.values)),
100	}
101	copy(o.keys, md.keys)
102	copy(o.values, md.values)
103
104	return o
105}
106
107func (md Metadata) sortedIndices() []int {
108	idxes := make([]int, len(md.keys))
109	for i := range idxes {
110		idxes[i] = i
111	}
112
113	sort.Slice(idxes, func(i, j int) bool {
114		return md.keys[idxes[i]] < md.keys[idxes[j]]
115	})
116	return idxes
117}
118
119func (md Metadata) Equal(rhs Metadata) bool {
120	if md.Len() != rhs.Len() {
121		return false
122	}
123
124	idxes := md.sortedIndices()
125	rhsIdxes := rhs.sortedIndices()
126	for i := range idxes {
127		j := idxes[i]
128		k := rhsIdxes[i]
129		if md.keys[j] != rhs.keys[k] || md.values[j] != rhs.values[k] {
130			return false
131		}
132	}
133	return true
134}
135
136// Schema is a sequence of Field values, describing the columns of a table or
137// a record batch.
138type Schema struct {
139	fields []Field
140	index  map[string][]int
141	meta   Metadata
142}
143
144// NewSchema returns a new Schema value from the slice of fields and metadata.
145//
146// NewSchema panics if there is a field with an invalid DataType.
147func NewSchema(fields []Field, metadata *Metadata) *Schema {
148	sc := &Schema{
149		fields: make([]Field, 0, len(fields)),
150		index:  make(map[string][]int, len(fields)),
151	}
152	if metadata != nil {
153		sc.meta = metadata.clone()
154	}
155	for i, field := range fields {
156		if field.Type == nil {
157			panic("arrow: field with nil DataType")
158		}
159		sc.fields = append(sc.fields, field)
160		sc.index[field.Name] = append(sc.index[field.Name], i)
161	}
162	return sc
163}
164
165func (sc *Schema) Metadata() Metadata { return sc.meta }
166func (sc *Schema) Fields() []Field    { return sc.fields }
167func (sc *Schema) Field(i int) Field  { return sc.fields[i] }
168
169func (sc *Schema) FieldsByName(n string) ([]Field, bool) {
170	indices, ok := sc.index[n]
171	if !ok {
172		return nil, ok
173	}
174	fields := make([]Field, 0, len(indices))
175	for _, v := range indices {
176		fields = append(fields, sc.fields[v])
177	}
178	return fields, ok
179}
180
181// FieldIndices returns the indices of the named field or nil.
182func (sc *Schema) FieldIndices(n string) []int {
183	return sc.index[n]
184}
185
186func (sc *Schema) HasField(n string) bool { return len(sc.FieldIndices(n)) > 0 }
187func (sc *Schema) HasMetadata() bool      { return len(sc.meta.keys) > 0 }
188
189// Equal returns whether two schema are equal.
190// Equal does not compare the metadata.
191func (sc *Schema) Equal(o *Schema) bool {
192	switch {
193	case sc == o:
194		return true
195	case sc == nil || o == nil:
196		return false
197	case len(sc.fields) != len(o.fields):
198		return false
199	}
200
201	for i := range sc.fields {
202		if !sc.fields[i].Equal(o.fields[i]) {
203			return false
204		}
205	}
206	return true
207}
208
209func (s *Schema) String() string {
210	o := new(strings.Builder)
211	fmt.Fprintf(o, "schema:\n  fields: %d\n", len(s.Fields()))
212	for i, f := range s.Fields() {
213		if i > 0 {
214			o.WriteString("\n")
215		}
216		fmt.Fprintf(o, "    - %v", f)
217	}
218	if meta := s.Metadata(); meta.Len() > 0 {
219		fmt.Fprintf(o, "\n  metadata: %v", meta)
220	}
221	return o.String()
222}
223
224func (s *Schema) Fingerprint() string {
225	if s == nil {
226		return ""
227	}
228
229	var b strings.Builder
230	b.WriteString("S{")
231	for _, f := range s.Fields() {
232		fieldFingerprint := f.Fingerprint()
233		if fieldFingerprint == "" {
234			return ""
235		}
236
237		b.WriteString(fieldFingerprint)
238		b.WriteByte(';')
239	}
240	// endianness
241	b.WriteByte('}')
242	return b.String()
243}
244