1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.io.serializer.avro;
20 
21 import java.util.HashSet;
22 import java.util.Set;
23 
24 import org.apache.avro.Schema;
25 import org.apache.avro.io.DatumReader;
26 import org.apache.avro.io.DatumWriter;
27 import org.apache.avro.reflect.ReflectData;
28 import org.apache.avro.reflect.ReflectDatumReader;
29 import org.apache.avro.reflect.ReflectDatumWriter;
30 import org.apache.hadoop.classification.InterfaceAudience;
31 import org.apache.hadoop.classification.InterfaceStability;
32 
33 /**
34  * Serialization for Avro Reflect classes. For a class to be accepted by this
35  * serialization, it must either be in the package list configured via
36  * <code>avro.reflect.pkgs</code> or implement
37  * {@link AvroReflectSerializable} interface.
38  *
39  */
40 @SuppressWarnings("unchecked")
41 @InterfaceAudience.Public
42 @InterfaceStability.Evolving
43 public class AvroReflectSerialization extends AvroSerialization<Object>{
44 
45   /**
46    * Key to configure packages that contain classes to be serialized and
47    * deserialized using this class. Multiple packages can be specified using
48    * comma-separated list.
49    */
50   @InterfaceAudience.Private
51   public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs";
52 
53   private Set<String> packages;
54 
55   @InterfaceAudience.Private
56   @Override
accept(Class<?> c)57   public synchronized boolean accept(Class<?> c) {
58     if (packages == null) {
59       getPackages();
60     }
61     return AvroReflectSerializable.class.isAssignableFrom(c) ||
62       (c.getPackage() != null && packages.contains(c.getPackage().getName()));
63   }
64 
getPackages()65   private void getPackages() {
66     String[] pkgList  = getConf().getStrings(AVRO_REFLECT_PACKAGES);
67     packages = new HashSet<String>();
68     if (pkgList != null) {
69       for (String pkg : pkgList) {
70         packages.add(pkg.trim());
71       }
72     }
73   }
74 
75   @InterfaceAudience.Private
76   @Override
getReader(Class<Object> clazz)77   public DatumReader getReader(Class<Object> clazz) {
78     try {
79       return new ReflectDatumReader(clazz);
80     } catch (Exception e) {
81       throw new RuntimeException(e);
82     }
83   }
84 
85   @InterfaceAudience.Private
86   @Override
getSchema(Object t)87   public Schema getSchema(Object t) {
88     return ReflectData.get().getSchema(t.getClass());
89   }
90 
91   @InterfaceAudience.Private
92   @Override
getWriter(Class<Object> clazz)93   public DatumWriter getWriter(Class<Object> clazz) {
94     return new ReflectDatumWriter();
95   }
96 
97 }
98