1 /** 2 * Licensed to the Apache Software Foundation (ASF) under one 3 * or more contributor license agreements. See the NOTICE file 4 * distributed with this work for additional information 5 * regarding copyright ownership. The ASF licenses this file 6 * to you under the Apache License, Version 2.0 (the 7 * "License"); you may not use this file except in compliance 8 * with the License. You may obtain a copy of the License at 9 * 10 * http://www.apache.org/licenses/LICENSE-2.0 11 * 12 * Unless required by applicable law or agreed to in writing, software 13 * distributed under the License is distributed on an "AS IS" BASIS, 14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 15 * See the License for the specific language governing permissions and 16 * limitations under the License. 17 */ 18 19 package org.apache.hadoop.record; 20 21 import java.io.InputStreamReader; 22 import java.io.InputStream; 23 import java.io.IOException; 24 import java.io.PushbackReader; 25 import java.io.UnsupportedEncodingException; 26 27 /** 28 */ 29 public class CsvRecordInput implements RecordInput { 30 31 private PushbackReader stream; 32 33 private class CsvIndex implements Index { done()34 public boolean done() { 35 char c = '\0'; 36 try { 37 c = (char) stream.read(); 38 stream.unread(c); 39 } catch (IOException ex) { 40 } 41 return (c == '}') ? true : false; 42 } incr()43 public void incr() {} 44 } 45 throwExceptionOnError(String tag)46 private void throwExceptionOnError(String tag) throws IOException { 47 throw new IOException("Error deserializing "+tag); 48 } 49 readField(String tag)50 private String readField(String tag) throws IOException { 51 try { 52 StringBuffer buf = new StringBuffer(); 53 while (true) { 54 char c = (char) stream.read(); 55 switch (c) { 56 case ',': 57 return buf.toString(); 58 case '}': 59 case '\n': 60 case '\r': 61 stream.unread(c); 62 return buf.toString(); 63 default: 64 buf.append(c); 65 } 66 } 67 } catch (IOException ex) { 68 throw new IOException("Error reading "+tag); 69 } 70 } 71 72 /** Creates a new instance of CsvRecordInput */ CsvRecordInput(InputStream in)73 public CsvRecordInput(InputStream in) { 74 try { 75 stream = new PushbackReader(new InputStreamReader(in, "UTF-8")); 76 } catch (UnsupportedEncodingException ex) { 77 throw new RuntimeException(ex); 78 } 79 } 80 readByte(String tag)81 public byte readByte(String tag) throws IOException { 82 return (byte) readLong(tag); 83 } 84 readBool(String tag)85 public boolean readBool(String tag) throws IOException { 86 String sval = readField(tag); 87 return "T".equals(sval) ? true : false; 88 } 89 readInt(String tag)90 public int readInt(String tag) throws IOException { 91 return (int) readLong(tag); 92 } 93 readLong(String tag)94 public long readLong(String tag) throws IOException { 95 String sval = readField(tag); 96 try { 97 long lval = Long.parseLong(sval); 98 return lval; 99 } catch (NumberFormatException ex) { 100 throw new IOException("Error deserializing "+tag); 101 } 102 } 103 readFloat(String tag)104 public float readFloat(String tag) throws IOException { 105 return (float) readDouble(tag); 106 } 107 readDouble(String tag)108 public double readDouble(String tag) throws IOException { 109 String sval = readField(tag); 110 try { 111 double dval = Double.parseDouble(sval); 112 return dval; 113 } catch (NumberFormatException ex) { 114 throw new IOException("Error deserializing "+tag); 115 } 116 } 117 readString(String tag)118 public String readString(String tag) throws IOException { 119 String sval = readField(tag); 120 return Utils.fromCSVString(sval); 121 } 122 readBuffer(String tag)123 public Buffer readBuffer(String tag) throws IOException { 124 String sval = readField(tag); 125 return Utils.fromCSVBuffer(sval); 126 } 127 startRecord(String tag)128 public void startRecord(String tag) throws IOException { 129 if (tag != null && !"".equals(tag)) { 130 char c1 = (char) stream.read(); 131 char c2 = (char) stream.read(); 132 if (c1 != 's' || c2 != '{') { 133 throw new IOException("Error deserializing "+tag); 134 } 135 } 136 } 137 endRecord(String tag)138 public void endRecord(String tag) throws IOException { 139 char c = (char) stream.read(); 140 if (tag == null || "".equals(tag)) { 141 if (c != '\n' && c != '\r') { 142 throw new IOException("Error deserializing record."); 143 } else { 144 return; 145 } 146 } 147 148 if (c != '}') { 149 throw new IOException("Error deserializing "+tag); 150 } 151 c = (char) stream.read(); 152 if (c != ',') { 153 stream.unread(c); 154 } 155 156 return; 157 } 158 startVector(String tag)159 public Index startVector(String tag) throws IOException { 160 char c1 = (char) stream.read(); 161 char c2 = (char) stream.read(); 162 if (c1 != 'v' || c2 != '{') { 163 throw new IOException("Error deserializing "+tag); 164 } 165 return new CsvIndex(); 166 } 167 endVector(String tag)168 public void endVector(String tag) throws IOException { 169 char c = (char) stream.read(); 170 if (c != '}') { 171 throw new IOException("Error deserializing "+tag); 172 } 173 c = (char) stream.read(); 174 if (c != ',') { 175 stream.unread(c); 176 } 177 return; 178 } 179 startMap(String tag)180 public Index startMap(String tag) throws IOException { 181 char c1 = (char) stream.read(); 182 char c2 = (char) stream.read(); 183 if (c1 != 'm' || c2 != '{') { 184 throw new IOException("Error deserializing "+tag); 185 } 186 return new CsvIndex(); 187 } 188 endMap(String tag)189 public void endMap(String tag) throws IOException { 190 char c = (char) stream.read(); 191 if (c != '}') { 192 throw new IOException("Error deserializing "+tag); 193 } 194 c = (char) stream.read(); 195 if (c != ',') { 196 stream.unread(c); 197 } 198 return; 199 } 200 } 201