1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package metadata_test
18
19import (
20	"context"
21	"testing"
22	"unsafe"
23
24	"github.com/apache/arrow/go/v6/parquet"
25	"github.com/apache/arrow/go/v6/parquet/metadata"
26	"github.com/apache/arrow/go/v6/parquet/schema"
27	"github.com/stretchr/testify/assert"
28	"github.com/stretchr/testify/require"
29)
30
31func generateTableMetaData(schema *schema.Schema, props *parquet.WriterProperties, nrows int64, statsInt, statsFloat metadata.EncodedStatistics) (*metadata.FileMetaData, error) {
32	fbuilder := metadata.NewFileMetadataBuilder(schema, props, nil)
33	rg1Builder := fbuilder.AppendRowGroup()
34	// metadata
35	// row group 1
36	col1Builder := rg1Builder.NextColumnChunk()
37	col2Builder := rg1Builder.NextColumnChunk()
38	// column metadata
39	dictEncodingStats := map[parquet.Encoding]int32{parquet.Encodings.RLEDict: 1}
40	dataEncodingStats := map[parquet.Encoding]int32{parquet.Encodings.Plain: 1, parquet.Encodings.RLE: 1}
41	statsInt.Signed = true
42	col1Builder.SetStats(statsInt)
43	statsFloat.Signed = true
44	col2Builder.SetStats(statsFloat)
45
46	col1Builder.Finish(metadata.ChunkMetaInfo{nrows / 2, 4, 0, 10, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
47	col2Builder.Finish(metadata.ChunkMetaInfo{nrows / 2, 24, 0, 30, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
48
49	rg1Builder.SetNumRows(nrows / 2)
50	rg1Builder.Finish(1024, -1)
51
52	// rowgroup2 metadata
53	rg2Builder := fbuilder.AppendRowGroup()
54	col1Builder = rg2Builder.NextColumnChunk()
55	col2Builder = rg2Builder.NextColumnChunk()
56	// column metadata
57	col1Builder.SetStats(statsInt)
58	col2Builder.SetStats(statsFloat)
59	dictEncodingStats = make(map[parquet.Encoding]int32)
60	col1Builder.Finish(metadata.ChunkMetaInfo{nrows / 2, 0 /*dictionary page offset*/, 0, 10, 512, 600}, false /* has dictionary */, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
61	col2Builder.Finish(metadata.ChunkMetaInfo{nrows / 2, 16, 0, 26, 512, 600}, true, false, metadata.EncodingStats{dictEncodingStats, dataEncodingStats}, nil)
62
63	rg2Builder.SetNumRows(nrows / 2)
64	rg2Builder.Finish(1024, -1)
65
66	return fbuilder.Finish()
67}
68
69func assertStatsSet(t *testing.T, m *metadata.ColumnChunkMetaData) {
70	ok, err := m.StatsSet()
71	assert.NoError(t, err)
72	assert.True(t, ok)
73}
74
75func assertStats(t *testing.T, m *metadata.ColumnChunkMetaData) metadata.TypedStatistics {
76	s, err := m.Statistics()
77	assert.NoError(t, err)
78	assert.NotNil(t, s)
79	return s
80}
81
82func TestBuildAccess(t *testing.T) {
83	props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V2_LATEST))
84
85	fields := schema.FieldList{
86		schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
87		schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
88	}
89	root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
90	require.NoError(t, err)
91	schema := schema.NewSchema(root)
92
93	var (
94		nrows      int64   = 1000
95		intMin     int32   = 100
96		intMax     int32   = 200
97		floatMin   float32 = 100.100
98		floatMax   float32 = 200.200
99		statsInt   metadata.EncodedStatistics
100		statsFloat metadata.EncodedStatistics
101	)
102
103	statsInt.SetNullCount(0).
104		SetDistinctCount(nrows).
105		SetMin((*(*[4]byte)(unsafe.Pointer(&intMin)))[:]).
106		SetMax((*(*[4]byte)(unsafe.Pointer(&intMax)))[:])
107
108	statsFloat.SetNullCount(0).
109		SetDistinctCount(nrows).
110		SetMin((*(*[4]byte)(unsafe.Pointer(&floatMin)))[:]).
111		SetMax((*(*[4]byte)(unsafe.Pointer(&floatMax)))[:])
112
113	faccessor, err := generateTableMetaData(schema, props, nrows, statsInt, statsFloat)
114	require.NoError(t, err)
115	serialized, err := faccessor.SerializeString(context.Background())
116	assert.NoError(t, err)
117	faccessorCopy, err := metadata.NewFileMetaData([]byte(serialized), nil)
118	assert.NoError(t, err)
119
120	for _, accessor := range []*metadata.FileMetaData{faccessor, faccessorCopy} {
121		// file metadata
122		assert.Equal(t, nrows, accessor.NumRows)
123		assert.Len(t, accessor.RowGroups, 2)
124		assert.EqualValues(t, parquet.V2_LATEST, accessor.Version())
125		assert.Equal(t, parquet.DefaultCreatedBy, accessor.GetCreatedBy())
126		assert.Equal(t, 3, accessor.NumSchemaElements())
127
128		// row group 1 metadata
129		rg1Access := accessor.RowGroup(0)
130		assert.Equal(t, 2, rg1Access.NumColumns())
131		assert.Equal(t, nrows/2, rg1Access.NumRows())
132		assert.Equal(t, int64(1024), rg1Access.TotalByteSize())
133		assert.Equal(t, int64(1024), rg1Access.TotalCompressedSize())
134
135		rg1Col1, err := rg1Access.ColumnChunk(0)
136		assert.NoError(t, err)
137		assert.Equal(t, rg1Access.FileOffset(), rg1Col1.DictionaryPageOffset())
138
139		rg1Col2, err := rg1Access.ColumnChunk(1)
140		assert.NoError(t, err)
141		assertStatsSet(t, rg1Col1)
142		assertStatsSet(t, rg1Col2)
143		assert.Equal(t, statsInt.Min, assertStats(t, rg1Col1).EncodeMin())
144		assert.Equal(t, statsInt.Max, assertStats(t, rg1Col1).EncodeMax())
145		assert.Equal(t, statsFloat.Min, assertStats(t, rg1Col2).EncodeMin())
146		assert.Equal(t, statsFloat.Max, assertStats(t, rg1Col2).EncodeMax())
147		assert.Zero(t, assertStats(t, rg1Col1).NullCount())
148		assert.Zero(t, assertStats(t, rg1Col2).NullCount())
149		assert.Equal(t, nrows, assertStats(t, rg1Col1).DistinctCount())
150		assert.Equal(t, nrows, assertStats(t, rg1Col2).DistinctCount())
151		assert.Equal(t, metadata.DefaultCompressionType, rg1Col1.Compression())
152		assert.Equal(t, metadata.DefaultCompressionType, rg1Col2.Compression())
153		assert.Equal(t, nrows/2, rg1Col1.NumValues())
154		assert.Equal(t, nrows/2, rg1Col2.NumValues())
155		assert.Len(t, rg1Col1.Encodings(), 3)
156		assert.Len(t, rg1Col2.Encodings(), 3)
157		assert.EqualValues(t, 512, rg1Col1.TotalCompressedSize())
158		assert.EqualValues(t, 512, rg1Col2.TotalCompressedSize())
159		assert.EqualValues(t, 600, rg1Col1.TotalUncompressedSize())
160		assert.EqualValues(t, 600, rg1Col2.TotalUncompressedSize())
161		assert.EqualValues(t, 4, rg1Col1.DictionaryPageOffset())
162		assert.EqualValues(t, 24, rg1Col2.DictionaryPageOffset())
163		assert.EqualValues(t, 10, rg1Col1.DataPageOffset())
164		assert.EqualValues(t, 30, rg1Col2.DataPageOffset())
165		assert.Len(t, rg1Col1.EncodingStats(), 3)
166		assert.Len(t, rg1Col2.EncodingStats(), 3)
167
168		// row group 2 metadata
169		rg2Access := accessor.RowGroup(1)
170		assert.Equal(t, 2, rg2Access.NumColumns())
171		assert.Equal(t, nrows/2, rg2Access.NumRows())
172		assert.EqualValues(t, 1024, rg2Access.TotalByteSize())
173		assert.EqualValues(t, 1024, rg2Access.TotalCompressedSize())
174
175		rg2Col1, err := rg2Access.ColumnChunk(0)
176		assert.NoError(t, err)
177		assert.Equal(t, rg2Access.FileOffset(), rg2Col1.DataPageOffset())
178
179		rg2Col2, err := rg2Access.ColumnChunk(1)
180		assert.NoError(t, err)
181		assertStatsSet(t, rg1Col1)
182		assertStatsSet(t, rg1Col2)
183		assert.Equal(t, statsInt.Min, assertStats(t, rg1Col1).EncodeMin())
184		assert.Equal(t, statsInt.Max, assertStats(t, rg1Col1).EncodeMax())
185		assert.Equal(t, statsFloat.Min, assertStats(t, rg1Col2).EncodeMin())
186		assert.Equal(t, statsFloat.Max, assertStats(t, rg1Col2).EncodeMax())
187		assert.Zero(t, assertStats(t, rg1Col1).NullCount())
188		assert.Zero(t, assertStats(t, rg1Col2).NullCount())
189		assert.Equal(t, nrows, assertStats(t, rg1Col1).DistinctCount())
190		assert.Equal(t, nrows, assertStats(t, rg1Col2).DistinctCount())
191		assert.Equal(t, metadata.DefaultCompressionType, rg2Col1.Compression())
192		assert.Equal(t, metadata.DefaultCompressionType, rg2Col2.Compression())
193		assert.Equal(t, nrows/2, rg2Col1.NumValues())
194		assert.Equal(t, nrows/2, rg2Col2.NumValues())
195		assert.Len(t, rg2Col1.Encodings(), 2)
196		assert.Len(t, rg2Col2.Encodings(), 3)
197		assert.EqualValues(t, 512, rg2Col1.TotalCompressedSize())
198		assert.EqualValues(t, 512, rg2Col2.TotalCompressedSize())
199		assert.EqualValues(t, 600, rg2Col1.TotalUncompressedSize())
200		assert.EqualValues(t, 600, rg2Col2.TotalUncompressedSize())
201		assert.EqualValues(t, 0, rg2Col1.DictionaryPageOffset())
202		assert.EqualValues(t, 16, rg2Col2.DictionaryPageOffset())
203		assert.EqualValues(t, 10, rg2Col1.DataPageOffset())
204		assert.EqualValues(t, 26, rg2Col2.DataPageOffset())
205		assert.Len(t, rg2Col1.EncodingStats(), 2)
206		assert.Len(t, rg2Col2.EncodingStats(), 2)
207
208		assert.Empty(t, rg2Col1.FilePath())
209		accessor.SetFilePath("/foo/bar/bar.parquet")
210		assert.Equal(t, "/foo/bar/bar.parquet", rg2Col1.FilePath())
211	}
212
213	faccessor2, err := generateTableMetaData(schema, props, nrows, statsInt, statsFloat)
214	require.NoError(t, err)
215	faccessor.AppendRowGroups(faccessor2)
216	assert.Len(t, faccessor.RowGroups, 4)
217	assert.Equal(t, nrows*2, faccessor.NumRows)
218	assert.EqualValues(t, parquet.V2_LATEST, faccessor.Version())
219	assert.Equal(t, parquet.DefaultCreatedBy, faccessor.GetCreatedBy())
220	assert.Equal(t, 3, faccessor.NumSchemaElements())
221
222	faccessor1, err := faccessor.Subset([]int{2, 3})
223	require.NoError(t, err)
224	assert.True(t, faccessor1.Equals(faccessor2))
225
226	faccessor1, err = faccessor2.Subset([]int{0})
227	require.NoError(t, err)
228
229	next, err := faccessor.Subset([]int{0})
230	require.NoError(t, err)
231	faccessor1.AppendRowGroups(next)
232
233	sub, err := faccessor.Subset([]int{2, 0})
234	require.NoError(t, err)
235	assert.True(t, faccessor1.Equals(sub))
236}
237
238func TestV1VersionMetadata(t *testing.T) {
239	props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
240
241	fields := schema.FieldList{
242		schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
243		schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
244	}
245	root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
246	require.NoError(t, err)
247	schema := schema.NewSchema(root)
248
249	fbuilder := metadata.NewFileMetadataBuilder(schema, props, nil)
250	faccessor, err := fbuilder.Finish()
251	require.NoError(t, err)
252	assert.EqualValues(t, parquet.V1_0, faccessor.Version())
253}
254
255func TestKeyValueMetadata(t *testing.T) {
256	props := parquet.NewWriterProperties(parquet.WithVersion(parquet.V1_0))
257
258	fields := schema.FieldList{
259		schema.NewInt32Node("int_col", parquet.Repetitions.Required, -1),
260		schema.NewFloat32Node("float_col", parquet.Repetitions.Required, -1),
261	}
262	root, err := schema.NewGroupNode("schema", parquet.Repetitions.Repeated, fields, -1)
263	require.NoError(t, err)
264	schema := schema.NewSchema(root)
265	kvmeta := metadata.NewKeyValueMetadata()
266	kvmeta.Append("test_key", "test_value")
267
268	fbuilder := metadata.NewFileMetadataBuilder(schema, props, kvmeta)
269	faccessor, err := fbuilder.Finish()
270	require.NoError(t, err)
271
272	assert.True(t, faccessor.KeyValueMetadata().Equals(kvmeta))
273}
274
275func TestApplicationVersion(t *testing.T) {
276	version := metadata.NewAppVersion("parquet-mr version 1.7.9")
277	version1 := metadata.NewAppVersion("parquet-mr version 1.8.0")
278	version2 := metadata.NewAppVersion("parquet-cpp version 1.0.0")
279	version3 := metadata.NewAppVersion("")
280	version4 := metadata.NewAppVersion("parquet-mr version 1.5.0ab-cdh5.5.0+cd (build abcd)")
281	version5 := metadata.NewAppVersion("parquet-mr")
282
283	assert.Equal(t, "parquet-mr", version.App)
284	assert.Equal(t, 1, version.Version.Major)
285	assert.Equal(t, 7, version.Version.Minor)
286	assert.Equal(t, 9, version.Version.Patch)
287
288	assert.Equal(t, "parquet-cpp", version2.App)
289	assert.Equal(t, 1, version2.Version.Major)
290	assert.Equal(t, 0, version2.Version.Minor)
291	assert.Equal(t, 0, version2.Version.Patch)
292
293	assert.Equal(t, "parquet-mr", version4.App)
294	assert.Equal(t, "abcd", version4.Build)
295	assert.Equal(t, 1, version4.Version.Major)
296	assert.Equal(t, 5, version4.Version.Minor)
297	assert.Equal(t, 0, version4.Version.Patch)
298	assert.Equal(t, "ab", version4.Version.Unknown)
299	assert.Equal(t, "cdh5.5.0", version4.Version.PreRelease)
300	assert.Equal(t, "cd", version4.Version.BuildInfo)
301
302	assert.Equal(t, "parquet-mr", version5.App)
303	assert.Equal(t, 0, version5.Version.Major)
304	assert.Equal(t, 0, version5.Version.Minor)
305	assert.Equal(t, 0, version5.Version.Patch)
306
307	assert.True(t, version.LessThan(version1))
308
309	var stats metadata.EncodedStatistics
310	assert.False(t, version1.HasCorrectStatistics(parquet.Types.Int96, schema.NoLogicalType{}, stats, schema.SortUNKNOWN))
311	assert.True(t, version.HasCorrectStatistics(parquet.Types.Int32, schema.NoLogicalType{}, stats, schema.SortSIGNED))
312	assert.False(t, version.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
313	assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
314	assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, stats, schema.SortUNSIGNED))
315	assert.True(t, version3.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NoLogicalType{}, stats, schema.SortSIGNED))
316
317	// check that the old stats are correct if min and max are the same regardless of sort order
318	var statsStr metadata.EncodedStatistics
319	statsStr.SetMin([]byte("a")).SetMax([]byte("b"))
320	assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsStr, schema.SortUNSIGNED))
321	statsStr.SetMax([]byte("a"))
322	assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsStr, schema.SortUNSIGNED))
323
324	// check that the same holds true for ints
325	var (
326		intMin int32 = 100
327		intMax int32 = 200
328	)
329	var statsInt metadata.EncodedStatistics
330	statsInt.SetMin((*(*[4]byte)(unsafe.Pointer(&intMin)))[:])
331	statsInt.SetMax((*(*[4]byte)(unsafe.Pointer(&intMax)))[:])
332	assert.False(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsInt, schema.SortUNSIGNED))
333	statsInt.SetMax((*(*[4]byte)(unsafe.Pointer(&intMin)))[:])
334	assert.True(t, version1.HasCorrectStatistics(parquet.Types.ByteArray, schema.NoLogicalType{}, statsInt, schema.SortUNSIGNED))
335}
336
337func TestCheckBadDecimalStats(t *testing.T) {
338	version1 := metadata.NewAppVersion("parquet-cpp version 3.0.0")
339	version2 := metadata.NewAppVersion("parquet-cpp-arrow version 3.0.0")
340	version3 := metadata.NewAppVersion("parquet-cpp-arrow version 4.0.0")
341
342	var stats metadata.EncodedStatistics
343	assert.False(t, version1.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
344	assert.False(t, version2.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
345	assert.True(t, version3.HasCorrectStatistics(parquet.Types.FixedLenByteArray, schema.NewDecimalLogicalType(5, 0), stats, schema.SortSIGNED))
346}
347