1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package parquet_test
18
19import (
20	"testing"
21
22	"github.com/apache/arrow/go/v6/parquet"
23	"github.com/apache/arrow/go/v6/parquet/internal/encryption"
24	"github.com/stretchr/testify/assert"
25)
26
27const (
28	FooterEncryptionKey  = "0123456789012345"
29	ColumnEncryptionKey1 = "1234567890123450"
30	ColumnEncryptionKey2 = "1234567890123451"
31	FileName             = "tester"
32)
33
34func TestColumnEncryptedWithOwnKey(t *testing.T) {
35	t.Parallel()
36
37	columnPath1 := "column_1"
38	colprops1 := parquet.NewColumnEncryptionProperties(columnPath1,
39		parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID("kc1"))
40
41	assert.Equal(t, columnPath1, colprops1.ColumnPath())
42	assert.True(t, colprops1.IsEncrypted())
43	assert.False(t, colprops1.IsEncryptedWithFooterKey())
44	assert.Equal(t, ColumnEncryptionKey1, colprops1.Key())
45	assert.Equal(t, "kc1", colprops1.KeyMetadata())
46}
47
48func TestColumnEncryptedWithFooterKey(t *testing.T) {
49	t.Parallel()
50
51	colPath1 := "column_1"
52	colprops1 := parquet.NewColumnEncryptionProperties(colPath1)
53
54	assert.Equal(t, colPath1, colprops1.ColumnPath())
55	assert.True(t, colprops1.IsEncrypted())
56	assert.True(t, colprops1.IsEncryptedWithFooterKey())
57}
58
59func TestUniformEncryption(t *testing.T) {
60	t.Parallel()
61
62	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithFooterKeyMetadata("kf"))
63
64	assert.True(t, props.EncryptedFooter())
65	assert.Equal(t, parquet.DefaultEncryptionAlgorithm, props.Algorithm().Algo)
66	assert.Equal(t, FooterEncryptionKey, props.FooterKey())
67	assert.Equal(t, "kf", props.FooterKeyMetadata())
68
69	colPath := parquet.ColumnPathFromString("a_column")
70	outColProps := props.ColumnEncryptionProperties(colPath.String())
71
72	assert.True(t, outColProps.IsEncrypted())
73	assert.True(t, outColProps.IsEncryptedWithFooterKey())
74}
75
76func TestEncryptFooterAndTwoColumns(t *testing.T) {
77	t.Parallel()
78
79	columnPath1 := parquet.ColumnPathFromString("column_1")
80	columnPath2 := parquet.ColumnPathFromString("column_2")
81
82	encryptedColumns := make(parquet.ColumnPathToEncryptionPropsMap)
83	encryptedColumns[columnPath1.String()] = parquet.NewColumnEncryptionProperties(columnPath1.String(),
84		parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID("kc1"))
85	encryptedColumns[columnPath2.String()] = parquet.NewColumnEncryptionProperties(columnPath2.String(),
86		parquet.WithKey(ColumnEncryptionKey2), parquet.WithKeyID("kc2"))
87
88	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
89		parquet.WithFooterKeyMetadata("kf"), parquet.WithEncryptedColumns(encryptedColumns))
90
91	assert.True(t, props.EncryptedFooter())
92	assert.Equal(t, parquet.DefaultEncryptionAlgorithm, props.Algorithm().Algo)
93	assert.Equal(t, FooterEncryptionKey, props.FooterKey())
94
95	outColProps1 := props.ColumnEncryptionProperties(columnPath1.String())
96	assert.Equal(t, columnPath1.String(), outColProps1.ColumnPath())
97	assert.True(t, outColProps1.IsEncrypted())
98	assert.False(t, outColProps1.IsEncryptedWithFooterKey())
99	assert.Equal(t, ColumnEncryptionKey1, outColProps1.Key())
100	assert.Equal(t, "kc1", outColProps1.KeyMetadata())
101
102	outColProps2 := props.ColumnEncryptionProperties(columnPath2.String())
103	assert.Equal(t, columnPath2.String(), outColProps2.ColumnPath())
104	assert.True(t, outColProps2.IsEncrypted())
105	assert.False(t, outColProps2.IsEncryptedWithFooterKey())
106	assert.Equal(t, ColumnEncryptionKey2, outColProps2.Key())
107	assert.Equal(t, "kc2", outColProps2.KeyMetadata())
108
109	columnPath3 := parquet.ColumnPathFromString("column_3")
110	outColProps3 := props.ColumnEncryptionProperties(columnPath3.String())
111	assert.Nil(t, outColProps3)
112}
113
114func TestEncryptTwoColumnsNotFooter(t *testing.T) {
115	t.Parallel()
116
117	columnPath1 := parquet.ColumnPathFromString("column_1")
118	columnPath2 := parquet.ColumnPathFromString("column_2")
119
120	encryptedColumns := make(parquet.ColumnPathToEncryptionPropsMap)
121	encryptedColumns[columnPath1.String()] = parquet.NewColumnEncryptionProperties(columnPath1.String(),
122		parquet.WithKey(ColumnEncryptionKey1), parquet.WithKeyID("kc1"))
123	encryptedColumns[columnPath2.String()] = parquet.NewColumnEncryptionProperties(columnPath2.String(),
124		parquet.WithKey(ColumnEncryptionKey2), parquet.WithKeyID("kc2"))
125
126	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
127		parquet.WithFooterKeyMetadata("kf"), parquet.WithPlaintextFooter(), parquet.WithEncryptedColumns(encryptedColumns))
128
129	assert.False(t, props.EncryptedFooter())
130	assert.Equal(t, parquet.DefaultEncryptionAlgorithm, props.Algorithm().Algo)
131	assert.Equal(t, FooterEncryptionKey, props.FooterKey())
132
133	outColProps1 := props.ColumnEncryptionProperties(columnPath1.String())
134	assert.Equal(t, columnPath1.String(), outColProps1.ColumnPath())
135	assert.True(t, outColProps1.IsEncrypted())
136	assert.False(t, outColProps1.IsEncryptedWithFooterKey())
137	assert.Equal(t, ColumnEncryptionKey1, outColProps1.Key())
138	assert.Equal(t, "kc1", outColProps1.KeyMetadata())
139
140	outColProps2 := props.ColumnEncryptionProperties(columnPath2.String())
141	assert.Equal(t, columnPath2.String(), outColProps2.ColumnPath())
142	assert.True(t, outColProps2.IsEncrypted())
143	assert.False(t, outColProps2.IsEncryptedWithFooterKey())
144	assert.Equal(t, ColumnEncryptionKey2, outColProps2.Key())
145	assert.Equal(t, "kc2", outColProps2.KeyMetadata())
146
147	columnPath3 := "column_3"
148	outColProps3 := props.ColumnEncryptionProperties(columnPath3)
149	assert.Nil(t, outColProps3)
150}
151
152func TestUseAadPrefix(t *testing.T) {
153	t.Parallel()
154
155	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey, parquet.WithAadPrefix(FileName))
156
157	assert.Equal(t, FileName, string(props.Algorithm().Aad.AadPrefix))
158	assert.False(t, props.Algorithm().Aad.SupplyAadPrefix)
159}
160
161func TestUseAadPrefixNotStoreInFile(t *testing.T) {
162	t.Parallel()
163
164	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
165		parquet.WithAadPrefix(FileName), parquet.DisableAadPrefixStorage())
166
167	assert.Empty(t, props.Algorithm().Aad.AadPrefix)
168	assert.True(t, props.Algorithm().Aad.SupplyAadPrefix)
169}
170
171func TestUseAES_GCM_CTR_V1Algo(t *testing.T) {
172	t.Parallel()
173
174	props := parquet.NewFileEncryptionProperties(FooterEncryptionKey,
175		parquet.WithAlg(parquet.AesCtr))
176
177	assert.Equal(t, parquet.AesCtr, props.Algorithm().Algo)
178}
179
180func TestUseKeyRetriever(t *testing.T) {
181	t.Parallel()
182
183	stringKr1 := make(encryption.StringKeyIDRetriever)
184	stringKr1.PutKey("kf", FooterEncryptionKey)
185	stringKr1.PutKey("kc1", ColumnEncryptionKey1)
186	stringKr1.PutKey("kc2", ColumnEncryptionKey2)
187
188	props := parquet.NewFileDecryptionProperties(parquet.WithKeyRetriever(stringKr1))
189	assert.Equal(t, FooterEncryptionKey, props.KeyRetriever.GetKey([]byte("kf")))
190	assert.Equal(t, ColumnEncryptionKey1, props.KeyRetriever.GetKey([]byte("kc1")))
191	assert.Equal(t, ColumnEncryptionKey2, props.KeyRetriever.GetKey([]byte("kc2")))
192}
193
194func TestSupplyAadPrefix(t *testing.T) {
195	props := parquet.NewFileDecryptionProperties(
196		parquet.WithFooterKey(FooterEncryptionKey), parquet.WithDecryptAadPrefix(FileName))
197	assert.Equal(t, FileName, props.AadPrefix())
198}
199
200func TestSetKey(t *testing.T) {
201	columnPath1 := parquet.ColumnPathFromString("column_1")
202	props := parquet.NewColumnDecryptionProperties(columnPath1.String(), parquet.WithDecryptKey(ColumnEncryptionKey1))
203	assert.Equal(t, ColumnEncryptionKey1, props.Key())
204}
205
206func TestUsingExplicitFooterAndColumnKeys(t *testing.T) {
207	colPath1 := "column_1"
208	colPath2 := "column_2"
209	decryptCols := make(parquet.ColumnPathToDecryptionPropsMap)
210	decryptCols[colPath1] = parquet.NewColumnDecryptionProperties(colPath1, parquet.WithDecryptKey(ColumnEncryptionKey1))
211	decryptCols[colPath2] = parquet.NewColumnDecryptionProperties(colPath2, parquet.WithDecryptKey(ColumnEncryptionKey2))
212
213	props := parquet.NewFileDecryptionProperties(parquet.WithFooterKey(FooterEncryptionKey), parquet.WithColumnKeys(decryptCols))
214	assert.Equal(t, FooterEncryptionKey, props.FooterKey())
215	assert.Equal(t, ColumnEncryptionKey1, props.ColumnKey(colPath1))
216	assert.Equal(t, ColumnEncryptionKey2, props.ColumnKey(colPath2))
217}
218