1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.io.serializer.avro; 20 21 import java.util.HashSet; 22 import java.util.Set; 23 24 import org.apache.avro.Schema; 25 import org.apache.avro.io.DatumReader; 26 import org.apache.avro.io.DatumWriter; 27 import org.apache.avro.reflect.ReflectData; 28 import org.apache.avro.reflect.ReflectDatumReader; 29 import org.apache.avro.reflect.ReflectDatumWriter; 30 import org.apache.hadoop.classification.InterfaceAudience; 31 import org.apache.hadoop.classification.InterfaceStability; 32 33 /** 34 * Serialization for Avro Reflect classes. For a class to be accepted by this 35 * serialization, it must either be in the package list configured via 36 * <code>avro.reflect.pkgs</code> or implement 37 * {@link AvroReflectSerializable} interface. 38 * 39 */ 40 @SuppressWarnings("unchecked") 41 @InterfaceAudience.Public 42 @InterfaceStability.Evolving 43 public class AvroReflectSerialization extends AvroSerialization<Object>{ 44 45 /** 46 * Key to configure packages that contain classes to be serialized and 47 * deserialized using this class. Multiple packages can be specified using 48 * comma-separated list. 49 */ 50 @InterfaceAudience.Private 51 public static final String AVRO_REFLECT_PACKAGES = "avro.reflect.pkgs"; 52 53 private Set<String> packages; 54 55 @InterfaceAudience.Private 56 @Override accept(Class<?> c)57 public synchronized boolean accept(Class<?> c) { 58 if (packages == null) { 59 getPackages(); 60 } 61 return AvroReflectSerializable.class.isAssignableFrom(c) || 62 (c.getPackage() != null && packages.contains(c.getPackage().getName())); 63 } 64 getPackages()65 private void getPackages() { 66 String[] pkgList = getConf().getStrings(AVRO_REFLECT_PACKAGES); 67 packages = new HashSet<String>(); 68 if (pkgList != null) { 69 for (String pkg : pkgList) { 70 packages.add(pkg.trim()); 71 } 72 } 73 } 74 75 @InterfaceAudience.Private 76 @Override getReader(Class<Object> clazz)77 public DatumReader getReader(Class<Object> clazz) { 78 try { 79 return new ReflectDatumReader(clazz); 80 } catch (Exception e) { 81 throw new RuntimeException(e); 82 } 83 } 84 85 @InterfaceAudience.Private 86 @Override getSchema(Object t)87 public Schema getSchema(Object t) { 88 return ReflectData.get().getSchema(t.getClass()); 89 } 90 91 @InterfaceAudience.Private 92 @Override getWriter(Class<Object> clazz)93 public DatumWriter getWriter(Class<Object> clazz) { 94 return new ReflectDatumWriter(); 95 } 96 97 } 98