1// Licensed to the Apache Software Foundation (ASF) under one 2// or more contributor license agreements. See the NOTICE file 3// distributed with this work for additional information 4// regarding copyright ownership. The ASF licenses this file 5// to you under the Apache License, Version 2.0 (the 6// "License"); you may not use this file except in compliance 7// with the License. You may obtain a copy of the License at 8// 9// http://www.apache.org/licenses/LICENSE-2.0 10// 11// Unless required by applicable law or agreed to in writing, software 12// distributed under the License is distributed on an "AS IS" BASIS, 13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 14// See the License for the specific language governing permissions and 15// limitations under the License. 16 17package schema 18 19import ( 20 "reflect" 21 "strconv" 22 "strings" 23 24 "github.com/apache/arrow/go/v6/parquet" 25 format "github.com/apache/arrow/go/v6/parquet/internal/gen-go/parquet" 26 "golang.org/x/xerrors" 27) 28 29type taggedInfo struct { 30 Name string 31 32 Type parquet.Type 33 KeyType parquet.Type 34 ValueType parquet.Type 35 36 Length int32 37 KeyLength int32 38 ValueLength int32 39 40 Scale int32 41 KeyScale int32 42 ValueScale int32 43 44 Precision int32 45 KeyPrecision int32 46 ValuePrecision int32 47 48 FieldID int32 49 KeyFieldID int32 50 ValueFieldID int32 51 52 RepetitionType parquet.Repetition 53 ValueRepetition parquet.Repetition 54 55 Converted ConvertedType 56 KeyConverted ConvertedType 57 ValueConverted ConvertedType 58 59 LogicalFields map[string]string 60 KeyLogicalFields map[string]string 61 ValueLogicalFields map[string]string 62 63 LogicalType LogicalType 64 KeyLogicalType LogicalType 65 ValueLogicalType LogicalType 66} 67 68func (t *taggedInfo) CopyForKey() (ret taggedInfo) { 69 ret = *t 70 ret.Type = t.KeyType 71 ret.Length = t.KeyLength 72 ret.Scale = t.KeyScale 73 ret.Precision = t.KeyPrecision 74 ret.FieldID = t.KeyFieldID 75 ret.RepetitionType = parquet.Repetitions.Required 76 ret.Converted = t.KeyConverted 77 ret.LogicalType = t.KeyLogicalType 78 return 79} 80 81func (t *taggedInfo) CopyForValue() (ret taggedInfo) { 82 ret = *t 83 ret.Type = t.ValueType 84 ret.Length = t.ValueLength 85 ret.Scale = t.ValueScale 86 ret.Precision = t.ValuePrecision 87 ret.FieldID = t.ValueFieldID 88 ret.RepetitionType = t.ValueRepetition 89 ret.Converted = t.ValueConverted 90 ret.LogicalType = t.ValueLogicalType 91 return 92} 93 94func (t *taggedInfo) UpdateLogicalTypes() { 95 processLogicalType := func(fields map[string]string, precision, scale int32) LogicalType { 96 t, ok := fields["type"] 97 if !ok { 98 return NoLogicalType{} 99 } 100 101 switch strings.ToLower(t) { 102 case "string": 103 return StringLogicalType{} 104 case "map": 105 return MapLogicalType{} 106 case "list": 107 return ListLogicalType{} 108 case "enum": 109 return EnumLogicalType{} 110 case "decimal": 111 if v, ok := fields["precision"]; ok { 112 precision = int32FromType(v) 113 } 114 if v, ok := fields["scale"]; ok { 115 scale = int32FromType(v) 116 } 117 return NewDecimalLogicalType(precision, scale) 118 case "date": 119 return DateLogicalType{} 120 case "time": 121 unit, ok := fields["unit"] 122 if !ok { 123 panic("must specify unit for time logical type") 124 } 125 adjustedToUtc, ok := fields["isadjustedutc"] 126 if !ok { 127 adjustedToUtc = "true" 128 } 129 return NewTimeLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(strings.ToLower(unit))) 130 case "timestamp": 131 unit, ok := fields["unit"] 132 if !ok { 133 panic("must specify unit for time logical type") 134 } 135 adjustedToUtc, ok := fields["isadjustedutc"] 136 if !ok { 137 adjustedToUtc = "true" 138 } 139 return NewTimestampLogicalType(boolFromStr(adjustedToUtc), timeUnitFromString(unit)) 140 case "integer": 141 width, ok := fields["bitwidth"] 142 if !ok { 143 panic("must specify bitwidth if explicitly setting integer logical type") 144 } 145 signed, ok := fields["signed"] 146 if !ok { 147 signed = "true" 148 } 149 150 return NewIntLogicalType(int8(int32FromType(width)), boolFromStr(signed)) 151 case "null": 152 return NullLogicalType{} 153 case "json": 154 return JSONLogicalType{} 155 case "bson": 156 return BSONLogicalType{} 157 case "uuid": 158 return UUIDLogicalType{} 159 default: 160 panic(xerrors.Errorf("invalid logical type specified: %s", t)) 161 } 162 } 163 164 t.LogicalType = processLogicalType(t.LogicalFields, t.Precision, t.Scale) 165 t.KeyLogicalType = processLogicalType(t.KeyLogicalFields, t.KeyPrecision, t.KeyScale) 166 t.ValueLogicalType = processLogicalType(t.ValueLogicalFields, t.ValuePrecision, t.ValueScale) 167} 168 169func newTaggedInfo() taggedInfo { 170 return taggedInfo{ 171 Type: parquet.Types.Undefined, 172 KeyType: parquet.Types.Undefined, 173 ValueType: parquet.Types.Undefined, 174 RepetitionType: parquet.Repetitions.Undefined, 175 ValueRepetition: parquet.Repetitions.Undefined, 176 Converted: ConvertedTypes.NA, 177 KeyConverted: ConvertedTypes.NA, 178 ValueConverted: ConvertedTypes.NA, 179 FieldID: -1, 180 KeyFieldID: -1, 181 ValueFieldID: -1, 182 LogicalFields: make(map[string]string), 183 KeyLogicalFields: make(map[string]string), 184 ValueLogicalFields: make(map[string]string), 185 LogicalType: NoLogicalType{}, 186 KeyLogicalType: NoLogicalType{}, 187 ValueLogicalType: NoLogicalType{}, 188 } 189} 190 191var int32FromType = func(v string) int32 { 192 val, err := strconv.Atoi(v) 193 if err != nil { 194 panic(err) 195 } 196 return int32(val) 197} 198 199var boolFromStr = func(v string) bool { 200 val, err := strconv.ParseBool(v) 201 if err != nil { 202 panic(err) 203 } 204 return val 205} 206 207func infoFromTags(f reflect.StructTag) *taggedInfo { 208 typeFromStr := func(v string) parquet.Type { 209 t, err := format.TypeFromString(strings.ToUpper(v)) 210 if err != nil { 211 panic(xerrors.Errorf("invalid type specified: %s", v)) 212 } 213 return parquet.Type(t) 214 } 215 216 repFromStr := func(v string) parquet.Repetition { 217 r, err := format.FieldRepetitionTypeFromString(strings.ToUpper(v)) 218 if err != nil { 219 panic(err) 220 } 221 return parquet.Repetition(r) 222 } 223 224 convertedFromStr := func(v string) ConvertedType { 225 c, err := format.ConvertedTypeFromString(strings.ToUpper(v)) 226 if err != nil { 227 panic(err) 228 } 229 return ConvertedType(c) 230 } 231 232 if ptags, ok := f.Lookup("parquet"); ok { 233 info := newTaggedInfo() 234 for _, tag := range strings.Split(strings.Replace(ptags, "\t", "", -1), ",") { 235 tag = strings.TrimSpace(tag) 236 kv := strings.SplitN(tag, "=", 2) 237 key := strings.TrimSpace(strings.ToLower(kv[0])) 238 value := strings.TrimSpace(kv[1]) 239 240 switch key { 241 case "name": 242 info.Name = value 243 case "type": 244 info.Type = typeFromStr(value) 245 case "keytype": 246 info.KeyType = typeFromStr(value) 247 case "valuetype": 248 info.ValueType = typeFromStr(value) 249 case "length": 250 info.Length = int32FromType(value) 251 case "keylength": 252 info.KeyLength = int32FromType(value) 253 case "valuelength": 254 info.ValueLength = int32FromType(value) 255 case "scale": 256 info.Scale = int32FromType(value) 257 case "keyscale": 258 info.KeyScale = int32FromType(value) 259 case "valuescale": 260 info.ValueScale = int32FromType(value) 261 case "precision": 262 info.Precision = int32FromType(value) 263 case "keyprecision": 264 info.KeyPrecision = int32FromType(value) 265 case "valueprecision": 266 info.ValuePrecision = int32FromType(value) 267 case "fieldid": 268 info.FieldID = int32FromType(value) 269 case "keyfieldid": 270 info.KeyFieldID = int32FromType(value) 271 case "valuefieldid": 272 info.ValueFieldID = int32FromType(value) 273 case "repetition": 274 info.RepetitionType = repFromStr(value) 275 case "valuerepetition": 276 info.ValueRepetition = repFromStr(value) 277 case "converted": 278 info.Converted = convertedFromStr(value) 279 case "keyconverted": 280 info.KeyConverted = convertedFromStr(value) 281 case "valueconverted": 282 info.ValueConverted = convertedFromStr(value) 283 case "logical": 284 info.LogicalFields["type"] = value 285 case "keylogical": 286 info.KeyLogicalFields["type"] = value 287 case "valuelogical": 288 info.ValueLogicalFields["type"] = value 289 default: 290 switch { 291 case strings.HasPrefix(key, "logical."): 292 info.LogicalFields[strings.TrimPrefix(key, "logical.")] = value 293 case strings.HasPrefix(key, "keylogical."): 294 info.KeyLogicalFields[strings.TrimPrefix(key, "keylogical.")] = value 295 case strings.HasPrefix(key, "valuelogical."): 296 info.ValueLogicalFields[strings.TrimPrefix(key, "valuelogical.")] = value 297 } 298 } 299 } 300 info.UpdateLogicalTypes() 301 return &info 302 } 303 return nil 304} 305 306// typeToNode recurseively converts a physical type and the tag info into parquet Nodes 307// 308// to avoid having to propagate errors up potentially high numbers of recursive calls 309// we use panics and then recover in the public function NewSchemaFromStruct so that a 310// failure very far down the stack quickly unwinds. 311func typeToNode(name string, typ reflect.Type, repType parquet.Repetition, info *taggedInfo) Node { 312 // set up our default values for everything 313 var ( 314 converted = ConvertedTypes.None 315 logical LogicalType = NoLogicalType{} 316 fieldID = int32(-1) 317 physical = parquet.Types.Undefined 318 typeLen = 0 319 precision = 0 320 scale = 0 321 ) 322 if info != nil { // we have struct tag info to process 323 fieldID = info.FieldID 324 if info.Converted != ConvertedTypes.NA { 325 converted = info.Converted 326 } 327 logical = info.LogicalType 328 physical = info.Type 329 typeLen = int(info.Length) 330 precision = int(info.Precision) 331 scale = int(info.Scale) 332 333 if info.Name != "" { 334 name = info.Name 335 } 336 if info.RepetitionType != parquet.Repetitions.Undefined { 337 repType = info.RepetitionType 338 } 339 } 340 341 // simplify the logic by switching based on the reflection Kind 342 switch typ.Kind() { 343 case reflect.Map: 344 // a map must have a logical type of MAP or have no tag for logical type in which case 345 // we assume MAP logical type. 346 if !logical.IsNone() && !logical.Equals(MapLogicalType{}) { 347 panic("cannot set logical type to something other than map for a map") 348 } 349 350 infoCopy := newTaggedInfo() 351 if info != nil { // populate any value specific tags to propagate for the value type 352 infoCopy = info.CopyForValue() 353 } 354 355 // create the node for the value type of the map 356 value := typeToNode("value", typ.Elem(), parquet.Repetitions.Required, &infoCopy) 357 if info != nil { // change our copy to now use the key specific tags if they exist 358 infoCopy = info.CopyForKey() 359 } 360 361 // create the node for the key type of the map 362 key := typeToNode("key", typ.Key(), parquet.Repetitions.Required, &infoCopy) 363 if key.RepetitionType() != parquet.Repetitions.Required { // key cannot be optional 364 panic("key type of map must be Required") 365 } 366 return Must(MapOf(name, key, value, repType, fieldID)) 367 case reflect.Struct: 368 // structs are Group nodes 369 fields := make(FieldList, 0) 370 for i := 0; i < typ.NumField(); i++ { 371 f := typ.Field(i) 372 373 fields = append(fields, typeToNode(f.Name, f.Type, parquet.Repetitions.Required, infoFromTags(f.Tag))) 374 } 375 // group nodes don't have a physical type 376 if physical != parquet.Types.Undefined { 377 panic("cannot specify custom type on struct") 378 } 379 // group nodes don't have converted or logical types 380 if converted != ConvertedTypes.None { 381 panic("cannot specify converted types for a struct") 382 } 383 if !logical.IsNone() { 384 panic("cannot specify logicaltype for a struct") 385 } 386 return Must(NewGroupNode(name, repType, fields, fieldID)) 387 case reflect.Ptr: // if we encounter a pointer create a node for the type it points to, but mark it as optional 388 return typeToNode(name, typ.Elem(), parquet.Repetitions.Optional, info) 389 case reflect.Array: 390 // arrays are repeated or fixed size 391 if typ == reflect.TypeOf(parquet.Int96{}) { 392 return NewInt96Node(name, repType, fieldID) 393 } 394 395 if typ.Elem() == reflect.TypeOf(byte(0)) { // something like [12]byte translates to FixedLenByteArray with length 12 396 if physical == parquet.Types.Undefined { 397 physical = parquet.Types.FixedLenByteArray 398 } 399 if typeLen == 0 { // if there was no type length specified in the tag, use the length of the type. 400 typeLen = typ.Len() 401 } 402 if !logical.IsNone() { 403 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID)) 404 } 405 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID)) 406 } 407 fallthrough // if it's not a fixed len byte array type, then just treat it like a slice 408 case reflect.Slice: 409 // for slices, we default to treating them as lists unless the repetition type is set to REPEATED or they are 410 // a bytearray/fixedlenbytearray 411 switch { 412 case repType == parquet.Repetitions.Repeated: 413 return typeToNode(name, typ.Elem(), parquet.Repetitions.Repeated, info) 414 case physical == parquet.Types.FixedLenByteArray || physical == parquet.Types.ByteArray: 415 if typ.Elem() != reflect.TypeOf(byte(0)) { 416 panic("slice with physical type ByteArray or FixedLenByteArray must be []byte") 417 } 418 fallthrough 419 case typ.Elem() == reflect.TypeOf(byte(0)): 420 if physical == parquet.Types.Undefined { 421 physical = parquet.Types.ByteArray 422 } 423 if !logical.IsNone() { 424 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, physical, typeLen, fieldID)) 425 } 426 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, physical, converted, typeLen, precision, scale, fieldID)) 427 default: 428 var elemInfo *taggedInfo 429 if info != nil { 430 elemInfo = &taggedInfo{} 431 *elemInfo = info.CopyForValue() 432 } 433 434 if !logical.IsNone() && !logical.Equals(ListLogicalType{}) { 435 panic("slice must either be repeated or a List type") 436 } 437 if converted != ConvertedTypes.None && converted != ConvertedTypes.List { 438 panic("slice must either be repeated or a List type") 439 } 440 return Must(ListOf(typeToNode(name, typ.Elem(), parquet.Repetitions.Required, elemInfo), repType, fieldID)) 441 } 442 case reflect.String: 443 // strings are byte arrays or fixedlen byte array 444 t := parquet.Types.ByteArray 445 switch physical { 446 case parquet.Types.Undefined, parquet.Types.ByteArray: 447 case parquet.Types.FixedLenByteArray: 448 t = parquet.Types.FixedLenByteArray 449 default: 450 panic("string fields should be of type bytearray or fixedlenbytearray only") 451 } 452 453 if !logical.IsNone() { 454 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, t, typeLen, fieldID)) 455 } 456 457 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, t, converted, typeLen, precision, scale, fieldID)) 458 case reflect.Int, reflect.Int32, reflect.Int8, reflect.Int16, reflect.Int64: 459 // handle integer types, default to setting the corresponding logical type 460 ptyp := parquet.Types.Int32 461 if typ.Bits() == 64 { 462 ptyp = parquet.Types.Int64 463 } 464 465 if physical != parquet.Types.Undefined { 466 ptyp = physical 467 } 468 469 if !logical.IsNone() { 470 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID)) 471 } 472 473 bitwidth := int8(typ.Bits()) 474 if physical != parquet.Types.Undefined { 475 if ptyp == parquet.Types.Int32 { 476 bitwidth = 32 477 } else if ptyp == parquet.Types.Int64 { 478 bitwidth = 64 479 } 480 } 481 482 if converted != ConvertedTypes.None { 483 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID)) 484 } 485 486 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, true), ptyp, 0, fieldID)) 487 case reflect.Uint, reflect.Uint32, reflect.Uint8, reflect.Uint16, reflect.Uint64: 488 // handle unsigned integer types and default to the corresponding logical type for it. 489 ptyp := parquet.Types.Int32 490 if typ.Bits() == 64 { 491 ptyp = parquet.Types.Int64 492 } 493 494 if physical != parquet.Types.Undefined { 495 ptyp = physical 496 } 497 498 if !logical.IsNone() { 499 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, ptyp, typeLen, fieldID)) 500 } 501 502 bitwidth := int8(typ.Bits()) 503 if physical != parquet.Types.Undefined { 504 if ptyp == parquet.Types.Int32 { 505 bitwidth = 32 506 } else if ptyp == parquet.Types.Int64 { 507 bitwidth = 64 508 } 509 } 510 511 if converted != ConvertedTypes.None { 512 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, ptyp, converted, 0, precision, scale, fieldID)) 513 } 514 515 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, NewIntLogicalType(bitwidth, false), ptyp, 0, fieldID)) 516 case reflect.Bool: 517 if !logical.IsNone() { 518 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Boolean, typeLen, fieldID)) 519 } 520 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Boolean, converted, typeLen, precision, scale, fieldID)) 521 case reflect.Float32: 522 if !logical.IsNone() { 523 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Float, typeLen, fieldID)) 524 } 525 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Float, converted, typeLen, precision, scale, fieldID)) 526 case reflect.Float64: 527 if !logical.IsNone() { 528 return MustPrimitive(NewPrimitiveNodeLogical(name, repType, logical, parquet.Types.Double, typeLen, fieldID)) 529 } 530 return MustPrimitive(NewPrimitiveNodeConverted(name, repType, parquet.Types.Double, converted, typeLen, precision, scale, fieldID)) 531 } 532 return nil 533} 534 535// NewSchemaFromStruct generates a schema from an object type via reflection of 536// the type and reading struct tags for "parquet". 537// 538// Rules 539// 540// Everything defaults to Required repetition, unless otherwise specified. 541// Pointer types become Optional repetition. 542// Arrays and Slices become logical List types unless using the tag `repetition=repeated`. 543// 544// A length specified byte field (like [5]byte) becomes a fixed_len_byte_array of that length 545// unless otherwise specified by tags. 546// 547// string and []byte both become ByteArray unless otherwise specified. 548// 549// Integer types will default to having a logical type of the appropriate bit width 550// and signedness rather than having no logical type, ie: an int8 will become an int32 551// node with logical type Int(bitWidth=8, signed=true). 552// 553// Structs will become group nodes with the fields of the struct as the fields of the group, 554// recursively creating the nodes. 555// 556// maps will become appropriate Map structures in the schema of the defined key and values. 557// 558// Available Tags 559// 560// name: by default the node will have the same name as the field, this tag let's you specify a name 561// 562// type: Specify the physical type instead of using the field type 563// 564// length: specify the type length of the node, only relevant for fixed_len_byte_array 565// 566// scale: specify the scale for a decimal field 567// 568// precision: specify the precision for a decimal field 569// 570// fieldid: specify the field ID for that node, defaults to -1 which means it is not set in the parquet file. 571// 572// repetition: specify the repetition as something other than what is determined by the type 573// 574// converted: specify the Converted Type of the field 575// 576// logical: specify the logical type of the field, if using decimal then the scale and precision 577// will be determined by the precision and scale fields, or by the logical.precision / logical.scale fields 578// with the logical. prefixed versions taking precedence. For Time or Timestamp logical types, 579// use logical.unit=<millis|micros|nanos> and logical.isadjustedutc=<true|false> to set those. Unit is required 580// isadjustedutc defaults to true. For Integer logical type, use logical.bitwidth and logical.signed to specify 581// those values, with bitwidth being required, and signed defaulting to true. 582// 583// All tags other than name can use a prefix of "key<tagname>=<value>" to refer to the type of the key for a map 584// and "value<tagname>=<value>" to refer to the value type of a map or the element of a list (such as the type of a slice) 585func NewSchemaFromStruct(obj interface{}) (sc *Schema, err error) { 586 ot := reflect.TypeOf(obj) 587 if ot.Kind() == reflect.Ptr { 588 ot = ot.Elem() 589 } 590 591 // typeToNode uses panics to fail fast / fail early instead of propagating 592 // errors up recursive stacks. so we recover here and return it as an error 593 defer func() { 594 if r := recover(); r != nil { 595 sc = nil 596 switch x := r.(type) { 597 case string: 598 err = xerrors.New(x) 599 case error: 600 err = x 601 default: 602 err = xerrors.New("unknown panic") 603 } 604 } 605 }() 606 607 root := typeToNode(ot.Name(), ot, parquet.Repetitions.Repeated, nil) 608 return NewSchema(root.(*GroupNode)), nil 609} 610 611var parquetTypeToReflect = map[parquet.Type]reflect.Type{ 612 parquet.Types.Boolean: reflect.TypeOf(true), 613 parquet.Types.Int32: reflect.TypeOf(int32(0)), 614 parquet.Types.Int64: reflect.TypeOf(int64(0)), 615 parquet.Types.Float: reflect.TypeOf(float32(0)), 616 parquet.Types.Double: reflect.TypeOf(float64(0)), 617 parquet.Types.Int96: reflect.TypeOf(parquet.Int96{}), 618 parquet.Types.ByteArray: reflect.TypeOf(parquet.ByteArray{}), 619 parquet.Types.FixedLenByteArray: reflect.TypeOf(parquet.FixedLenByteArray{}), 620} 621 622func typeFromNode(n Node) reflect.Type { 623 switch n.Type() { 624 case Primitive: 625 typ := parquetTypeToReflect[n.(*PrimitiveNode).PhysicalType()] 626 // if a bytearray field is annoted as a String logical type or a UTF8 converted type 627 // then use a string instead of parquet.ByteArray / parquet.FixedLenByteArray which are []byte 628 if n.LogicalType().Equals(StringLogicalType{}) || n.ConvertedType() == ConvertedTypes.UTF8 { 629 typ = reflect.TypeOf(string("")) 630 } 631 632 if n.RepetitionType() == parquet.Repetitions.Optional { 633 typ = reflect.PtrTo(typ) 634 } else if n.RepetitionType() == parquet.Repetitions.Repeated { 635 typ = reflect.SliceOf(typ) 636 } 637 638 return typ 639 case Group: 640 gnode := n.(*GroupNode) 641 switch gnode.ConvertedType() { 642 case ConvertedTypes.List: 643 // According to the Parquet Spec, a list should always be a 3-level structure 644 // 645 // <list-repetition> group <name> (LIST) { 646 // repeated group list { 647 // <element-repetition> <element-type> element; 648 // } 649 // } 650 // 651 // Outer-most level must be a group annotated with LIST containing a single field named "list". 652 // this level must be only optional (if the list is nullable) or required 653 // Middle level, named list, must be repeated group with a single field named "element" 654 // "element" field is the lists element type and repetition, which should be only required or optional 655 656 if gnode.fields.Len() != 1 { 657 panic("invalid list node, should have exactly 1 child.") 658 } 659 660 if gnode.fields[0].RepetitionType() != parquet.Repetitions.Repeated { 661 panic("invalid list node, child should be repeated") 662 } 663 664 // it is required that the repeated group of elements is named "list" and it's element 665 // field is named "element", however existing data may not use this so readers shouldn't 666 // enforce them as errors 667 // 668 // Rules for backward compatibility from the parquet spec: 669 // 670 // 1) if the repeated field is not a group, then it's type is the element type and elements 671 // must be required. 672 // 2) if the repeated field is a group with multiple fields, then its type is the element type 673 // and elements must be required. 674 // 3) if the repeated field is a group with one field AND is named either "array" or uses the 675 // LIST-annotated group's name with "_tuple" suffix, then the repeated type is the element 676 // type and the elements must be required. 677 // 4) otherwise, the repeated field's type is the element type with the repeated field's repetition 678 679 elemMustBeRequired := false 680 addSlice := false 681 var elemType reflect.Type 682 elemNode := gnode.fields[0] 683 switch { 684 case elemNode.Type() == Primitive, 685 elemNode.(*GroupNode).fields.Len() > 1, 686 elemNode.(*GroupNode).fields.Len() == 1 && (elemNode.Name() == "array" || elemNode.Name() == gnode.Name()+"_tuple"): 687 elemMustBeRequired = true 688 elemType = typeFromNode(elemNode) 689 default: 690 addSlice = true 691 elemType = typeFromNode(elemNode.(*GroupNode).fields[0]) 692 } 693 694 if elemMustBeRequired && elemType.Kind() == reflect.Ptr { 695 elemType = elemType.Elem() 696 } 697 if addSlice { 698 elemType = reflect.SliceOf(elemType) 699 } 700 if gnode.RepetitionType() == parquet.Repetitions.Optional { 701 elemType = reflect.PtrTo(elemType) 702 } 703 return elemType 704 case ConvertedTypes.Map, ConvertedTypes.MapKeyValue: 705 // According to the Parquet Spec, the outer-most level should be 706 // a group containing a single field named "key_value" with repetition 707 // either optional or required for whether or not the map is nullable. 708 // 709 // The key_value middle level *must* be a repeated group with a "key" field 710 // and *optionally* a "value" field 711 // 712 // the "key" field *must* be required and must always exist 713 // 714 // the "value" field can be required or optional or omitted. 715 // 716 // <map-repetition> group <name> (MAP) { 717 // repeated group key_value { 718 // required <key-type> key; 719 // <value-repetition> <value-type> value; 720 // } 721 // } 722 723 if gnode.fields.Len() != 1 { 724 panic("invalid map node, should have exactly 1 child") 725 } 726 727 if gnode.fields[0].Type() != Group { 728 panic("invalid map node, child should be a group node") 729 } 730 731 // that said, this may not be used in existing data and should not be 732 // enforced as errors when reading. 733 // 734 // some data may also incorrectly use MAP_KEY_VALUE instead of MAP 735 // 736 // so any group with MAP_KEY_VALUE that is not contained inside of a "MAP" 737 // group, should be considered equivalent to being a MAP group itself. 738 // 739 // in addition, the fields may not be called "key" and "value" in existing 740 // data, and as such should not be enforced as errors when reading. 741 742 keyval := gnode.fields[0].(*GroupNode) 743 744 keyIndex := keyval.FieldIndexByName("key") 745 if keyIndex == -1 { 746 keyIndex = 0 // use first child if there is no child named "key" 747 } 748 749 keyType := typeFromNode(keyval.fields[keyIndex]) 750 if keyType.Kind() == reflect.Ptr { 751 keyType = keyType.Elem() 752 } 753 // can't use a []byte as a key for a map, so use string 754 if keyType == reflect.TypeOf(parquet.ByteArray{}) || keyType == reflect.TypeOf(parquet.FixedLenByteArray{}) { 755 keyType = reflect.TypeOf(string("")) 756 } 757 758 // if the value node is omitted, then consider this a "set" and make it a 759 // map[key-type]bool 760 valType := reflect.TypeOf(true) 761 if keyval.fields.Len() > 1 { 762 valIndex := keyval.FieldIndexByName("value") 763 if valIndex == -1 { 764 valIndex = 1 // use second child if there is no child named "value" 765 } 766 767 valType = typeFromNode(keyval.fields[valIndex]) 768 } 769 770 mapType := reflect.MapOf(keyType, valType) 771 if gnode.RepetitionType() == parquet.Repetitions.Optional { 772 mapType = reflect.PtrTo(mapType) 773 } 774 return mapType 775 default: 776 fields := []reflect.StructField{} 777 for _, f := range gnode.fields { 778 fields = append(fields, reflect.StructField{ 779 Name: f.Name(), 780 Type: typeFromNode(f), 781 PkgPath: "parquet", 782 }) 783 } 784 785 structType := reflect.StructOf(fields) 786 if gnode.RepetitionType() == parquet.Repetitions.Repeated { 787 return reflect.SliceOf(structType) 788 } 789 if gnode.RepetitionType() == parquet.Repetitions.Optional { 790 return reflect.PtrTo(structType) 791 } 792 return structType 793 } 794 } 795 panic("what happened?") 796} 797 798// NewStructFromSchema generates a struct type as a reflect.Type from the schema 799// by using the appropriate physical types and making things either pointers or slices 800// based on whether they are repeated/optional/required. It does not use the logical 801// or converted types to change the physical storage so that it is more efficient to use 802// the resulting type for reading without having to do conversions. 803// 804// It will use maps for map types and slices for list types, but otherwise ignores the 805// converted and logical types of the nodes. Group nodes that are not List or Map will 806// be nested structs. 807func NewStructFromSchema(sc *Schema) (t reflect.Type, err error) { 808 defer func() { 809 if r := recover(); r != nil { 810 t = nil 811 switch x := r.(type) { 812 case string: 813 err = xerrors.New(x) 814 case error: 815 err = x 816 default: 817 err = xerrors.New("unknown panic") 818 } 819 } 820 }() 821 822 t = typeFromNode(sc.root) 823 if t.Kind() == reflect.Slice || t.Kind() == reflect.Ptr { 824 return t.Elem(), nil 825 } 826 return 827} 828