1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.io.serializer;
20 
21 import java.io.DataInputStream;
22 import java.io.DataOutputStream;
23 import java.io.IOException;
24 import java.io.InputStream;
25 import java.io.OutputStream;
26 import org.apache.hadoop.classification.InterfaceAudience;
27 import org.apache.hadoop.classification.InterfaceStability;
28 import org.apache.hadoop.conf.Configuration;
29 import org.apache.hadoop.conf.Configured;
30 import org.apache.hadoop.io.Writable;
31 import org.apache.hadoop.util.ReflectionUtils;
32 
33 /**
34  * A {@link Serialization} for {@link Writable}s that delegates to
35  * {@link Writable#write(java.io.DataOutput)} and
36  * {@link Writable#readFields(java.io.DataInput)}.
37  */
38 @InterfaceAudience.Public
39 @InterfaceStability.Evolving
40 public class WritableSerialization extends Configured
41 	implements Serialization<Writable> {
42   static class WritableDeserializer extends Configured
43   	implements Deserializer<Writable> {
44 
45     private Class<?> writableClass;
46     private DataInputStream dataIn;
47 
WritableDeserializer(Configuration conf, Class<?> c)48     public WritableDeserializer(Configuration conf, Class<?> c) {
49       setConf(conf);
50       this.writableClass = c;
51     }
52 
53     @Override
open(InputStream in)54     public void open(InputStream in) {
55       if (in instanceof DataInputStream) {
56         dataIn = (DataInputStream) in;
57       } else {
58         dataIn = new DataInputStream(in);
59       }
60     }
61 
62     @Override
deserialize(Writable w)63     public Writable deserialize(Writable w) throws IOException {
64       Writable writable;
65       if (w == null) {
66         writable
67           = (Writable) ReflectionUtils.newInstance(writableClass, getConf());
68       } else {
69         writable = w;
70       }
71       writable.readFields(dataIn);
72       return writable;
73     }
74 
75     @Override
close()76     public void close() throws IOException {
77       dataIn.close();
78     }
79 
80   }
81 
82   static class WritableSerializer extends Configured implements
83   	Serializer<Writable> {
84 
85     private DataOutputStream dataOut;
86 
87     @Override
open(OutputStream out)88     public void open(OutputStream out) {
89       if (out instanceof DataOutputStream) {
90         dataOut = (DataOutputStream) out;
91       } else {
92         dataOut = new DataOutputStream(out);
93       }
94     }
95 
96     @Override
serialize(Writable w)97     public void serialize(Writable w) throws IOException {
98       w.write(dataOut);
99     }
100 
101     @Override
close()102     public void close() throws IOException {
103       dataOut.close();
104     }
105 
106   }
107 
108   @InterfaceAudience.Private
109   @Override
accept(Class<?> c)110   public boolean accept(Class<?> c) {
111     return Writable.class.isAssignableFrom(c);
112   }
113 
114   @InterfaceAudience.Private
115   @Override
getSerializer(Class<Writable> c)116   public Serializer<Writable> getSerializer(Class<Writable> c) {
117     return new WritableSerializer();
118   }
119 
120   @InterfaceAudience.Private
121   @Override
getDeserializer(Class<Writable> c)122   public Deserializer<Writable> getDeserializer(Class<Writable> c) {
123     return new WritableDeserializer(getConf(), c);
124   }
125 
126 }
127