1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 package org.apache.hadoop.record;
20 
21 import java.io.InputStreamReader;
22 import java.io.InputStream;
23 import java.io.IOException;
24 import java.io.PushbackReader;
25 import java.io.UnsupportedEncodingException;
26 
27 /**
28  */
29 public class CsvRecordInput implements RecordInput {
30 
31   private PushbackReader stream;
32 
33   private class CsvIndex implements Index {
done()34     public boolean done() {
35       char c = '\0';
36       try {
37         c = (char) stream.read();
38         stream.unread(c);
39       } catch (IOException ex) {
40       }
41       return (c == '}') ? true : false;
42     }
incr()43     public void incr() {}
44   }
45 
throwExceptionOnError(String tag)46   private void throwExceptionOnError(String tag) throws IOException {
47     throw new IOException("Error deserializing "+tag);
48   }
49 
readField(String tag)50   private String readField(String tag) throws IOException {
51     try {
52       StringBuffer buf = new StringBuffer();
53       while (true) {
54         char c = (char) stream.read();
55         switch (c) {
56         case ',':
57           return buf.toString();
58         case '}':
59         case '\n':
60         case '\r':
61           stream.unread(c);
62           return buf.toString();
63         default:
64           buf.append(c);
65         }
66       }
67     } catch (IOException ex) {
68       throw new IOException("Error reading "+tag);
69     }
70   }
71 
72   /** Creates a new instance of CsvRecordInput */
CsvRecordInput(InputStream in)73   public CsvRecordInput(InputStream in) {
74     try {
75       stream = new PushbackReader(new InputStreamReader(in, "UTF-8"));
76     } catch (UnsupportedEncodingException ex) {
77       throw new RuntimeException(ex);
78     }
79   }
80 
readByte(String tag)81   public byte readByte(String tag) throws IOException {
82     return (byte) readLong(tag);
83   }
84 
readBool(String tag)85   public boolean readBool(String tag) throws IOException {
86     String sval = readField(tag);
87     return "T".equals(sval) ? true : false;
88   }
89 
readInt(String tag)90   public int readInt(String tag) throws IOException {
91     return (int) readLong(tag);
92   }
93 
readLong(String tag)94   public long readLong(String tag) throws IOException {
95     String sval = readField(tag);
96     try {
97       long lval = Long.parseLong(sval);
98       return lval;
99     } catch (NumberFormatException ex) {
100       throw new IOException("Error deserializing "+tag);
101     }
102   }
103 
readFloat(String tag)104   public float readFloat(String tag) throws IOException {
105     return (float) readDouble(tag);
106   }
107 
readDouble(String tag)108   public double readDouble(String tag) throws IOException {
109     String sval = readField(tag);
110     try {
111       double dval = Double.parseDouble(sval);
112       return dval;
113     } catch (NumberFormatException ex) {
114       throw new IOException("Error deserializing "+tag);
115     }
116   }
117 
readString(String tag)118   public String readString(String tag) throws IOException {
119     String sval = readField(tag);
120     return Utils.fromCSVString(sval);
121   }
122 
readBuffer(String tag)123   public Buffer readBuffer(String tag) throws IOException {
124     String sval = readField(tag);
125     return Utils.fromCSVBuffer(sval);
126   }
127 
startRecord(String tag)128   public void startRecord(String tag) throws IOException {
129     if (tag != null && !"".equals(tag)) {
130       char c1 = (char) stream.read();
131       char c2 = (char) stream.read();
132       if (c1 != 's' || c2 != '{') {
133         throw new IOException("Error deserializing "+tag);
134       }
135     }
136   }
137 
endRecord(String tag)138   public void endRecord(String tag) throws IOException {
139     char c = (char) stream.read();
140     if (tag == null || "".equals(tag)) {
141       if (c != '\n' && c != '\r') {
142         throw new IOException("Error deserializing record.");
143       } else {
144         return;
145       }
146     }
147 
148     if (c != '}') {
149       throw new IOException("Error deserializing "+tag);
150     }
151     c = (char) stream.read();
152     if (c != ',') {
153       stream.unread(c);
154     }
155 
156     return;
157   }
158 
startVector(String tag)159   public Index startVector(String tag) throws IOException {
160     char c1 = (char) stream.read();
161     char c2 = (char) stream.read();
162     if (c1 != 'v' || c2 != '{') {
163       throw new IOException("Error deserializing "+tag);
164     }
165     return new CsvIndex();
166   }
167 
endVector(String tag)168   public void endVector(String tag) throws IOException {
169     char c = (char) stream.read();
170     if (c != '}') {
171       throw new IOException("Error deserializing "+tag);
172     }
173     c = (char) stream.read();
174     if (c != ',') {
175       stream.unread(c);
176     }
177     return;
178   }
179 
startMap(String tag)180   public Index startMap(String tag) throws IOException {
181     char c1 = (char) stream.read();
182     char c2 = (char) stream.read();
183     if (c1 != 'm' || c2 != '{') {
184       throw new IOException("Error deserializing "+tag);
185     }
186     return new CsvIndex();
187   }
188 
endMap(String tag)189   public void endMap(String tag) throws IOException {
190     char c = (char) stream.read();
191     if (c != '}') {
192       throw new IOException("Error deserializing "+tag);
193     }
194     c = (char) stream.read();
195     if (c != ',') {
196       stream.unread(c);
197     }
198     return;
199   }
200 }
201