1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include "csvarchive.hh"
20 #include <stdlib.h>
21 
22 using namespace hadoop;
23 
readUptoTerminator(PushBackInStream & stream)24 static std::string readUptoTerminator(PushBackInStream& stream)
25 {
26   std::string s;
27   while (1) {
28     char c;
29     if (1 != stream.read(&c, 1)) {
30       throw new IOException("Error in deserialization.");
31     }
32     if (c == ',' || c == '\n' || c == '}') {
33       if (c != ',') {
34         stream.pushBack(c);
35       }
36       break;
37     }
38     s.push_back(c);
39   }
40   return s;
41 }
42 
deserialize(int8_t & t,const char * tag)43 void hadoop::ICsvArchive::deserialize(int8_t& t, const char* tag)
44 {
45   std::string s = readUptoTerminator(stream);
46   t = (int8_t) strtol(s.c_str(), NULL, 10);
47 }
48 
deserialize(bool & t,const char * tag)49 void hadoop::ICsvArchive::deserialize(bool& t, const char* tag)
50 {
51   std::string s = readUptoTerminator(stream);
52   t = (s == "T") ? true : false;
53 }
54 
deserialize(int32_t & t,const char * tag)55 void hadoop::ICsvArchive::deserialize(int32_t& t, const char* tag)
56 {
57   std::string s = readUptoTerminator(stream);
58   t = strtol(s.c_str(), NULL, 10);
59 }
60 
deserialize(int64_t & t,const char * tag)61 void hadoop::ICsvArchive::deserialize(int64_t& t, const char* tag)
62 {
63   std::string s = readUptoTerminator(stream);
64   t = strtoll(s.c_str(), NULL, 10);
65 }
66 
deserialize(float & t,const char * tag)67 void hadoop::ICsvArchive::deserialize(float& t, const char* tag)
68 {
69   std::string s = readUptoTerminator(stream);
70   t = strtof(s.c_str(), NULL);
71 }
72 
deserialize(double & t,const char * tag)73 void hadoop::ICsvArchive::deserialize(double& t, const char* tag)
74 {
75   std::string s = readUptoTerminator(stream);
76   t = strtod(s.c_str(), NULL);
77 }
78 
deserialize(std::string & t,const char * tag)79 void hadoop::ICsvArchive::deserialize(std::string& t, const char* tag)
80 {
81   std::string temp = readUptoTerminator(stream);
82   if (temp[0] != '\'') {
83     throw new IOException("Errror deserializing string.");
84   }
85   t.clear();
86   // skip first character, replace escaped characters
87   int len = temp.length();
88   for (int i = 1; i < len; i++) {
89     char c = temp.at(i);
90     if (c == '%') {
91       // since we escape '%', there have to be at least two chars following a '%'
92       char ch1 = temp.at(i+1);
93       char ch2 = temp.at(i+2);
94       i += 2;
95 	  if (ch1 == '0' && ch2 == '0') {
96 	    t.append(1, '\0');
97 	  } else if (ch1 == '0' && ch2 == 'A') {
98 	    t.append(1, '\n');
99 	  } else if (ch1 == '0' && ch2 == 'D') {
100 	    t.append(1, '\r');
101 	  } else if (ch1 == '2' && ch2 == 'C') {
102 	    t.append(1, ',');
103 	  } else if (ch1 == '7' && ch2 == 'D') {
104 	    t.append(1, '}');
105 	  } else if (ch1 == '2' && ch2 == '5') {
106 	    t.append(1, '%');
107 	  } else {
108 	    throw new IOException("Error deserializing string.");
109 	  }
110     }
111     else {
112       t.append(1, c);
113     }
114   }
115 }
116 
deserialize(std::string & t,size_t & len,const char * tag)117 void hadoop::ICsvArchive::deserialize(std::string& t, size_t& len, const char* tag)
118 {
119   std::string s = readUptoTerminator(stream);
120   if (s[0] != '#') {
121     throw new IOException("Errror deserializing buffer.");
122   }
123   s.erase(0, 1); /// erase first character
124   len = s.length();
125   if (len%2 == 1) { // len is guaranteed to be even
126     throw new IOException("Errror deserializing buffer.");
127   }
128   len = len >> 1;
129   for (size_t idx = 0; idx < len; idx++) {
130     char buf[3];
131     buf[0] = s[2*idx];
132     buf[1] = s[2*idx+1];
133     buf[2] = '\0';
134     int i;
135     if (1 != sscanf(buf, "%2x", &i)) {
136       throw new IOException("Errror deserializing buffer.");
137     }
138     t.push_back((char) i);
139   }
140   len = t.length();
141 }
142 
startRecord(Record & s,const char * tag)143 void hadoop::ICsvArchive::startRecord(Record& s, const char* tag)
144 {
145   if (tag != NULL) {
146     char mark[2];
147     if (2 != stream.read(mark, 2)) {
148       throw new IOException("Error deserializing record.");
149     }
150     if (mark[0] != 's' || mark[1] != '{') {
151       throw new IOException("Error deserializing record.");
152     }
153   }
154 }
155 
endRecord(Record & s,const char * tag)156 void hadoop::ICsvArchive::endRecord(Record& s, const char* tag)
157 {
158   char mark;
159   if (1 != stream.read(&mark, 1)) {
160     throw new IOException("Error deserializing record.");
161   }
162   if (tag == NULL) {
163     if (mark != '\n') {
164       throw new IOException("Error deserializing record.");
165     }
166   } else if (mark != '}') {
167     throw new IOException("Error deserializing record.");
168   } else {
169     readUptoTerminator(stream);
170   }
171 }
172 
startVector(const char * tag)173 Index* hadoop::ICsvArchive::startVector(const char* tag)
174 {
175   char mark[2];
176   if (2 != stream.read(mark, 2)) {
177     throw new IOException("Error deserializing vector.");
178   }
179   if (mark[0] != 'v' || mark[1] != '{') {
180     throw new IOException("Error deserializing vector.");
181   }
182   return new CsvIndex(stream);
183 }
184 
endVector(Index * idx,const char * tag)185 void hadoop::ICsvArchive::endVector(Index* idx, const char* tag)
186 {
187   delete idx;
188   char mark;
189   if (1 != stream.read(&mark, 1)) {
190     throw new IOException("Error deserializing vector.");
191   }
192   if (mark != '}') {
193     throw new IOException("Error deserializing vector.");
194   }
195   readUptoTerminator(stream);
196 }
197 
startMap(const char * tag)198 Index* hadoop::ICsvArchive::startMap(const char* tag)
199 {
200   char mark[2];
201   if (2 != stream.read(mark, 2)) {
202     throw new IOException("Error deserializing map.");
203   }
204   if (mark[0] != 'm' || mark[1] != '{') {
205     throw new IOException("Error deserializing map.");
206   }
207 
208   return new CsvIndex(stream);
209 }
210 
endMap(Index * idx,const char * tag)211 void hadoop::ICsvArchive::endMap(Index* idx, const char* tag)
212 {
213   delete idx;
214   char mark;
215   if (1 != stream.read(&mark, 1)) {
216     throw new IOException("Error deserializing map.");
217   }
218   if (mark != '}') {
219     throw new IOException("Error deserializing map.");
220   }
221   readUptoTerminator(stream);
222 }
223 
~ICsvArchive()224 hadoop::ICsvArchive::~ICsvArchive()
225 {
226 }
227 
serialize(int8_t t,const char * tag)228 void hadoop::OCsvArchive::serialize(int8_t t, const char* tag)
229 {
230   printCommaUnlessFirst();
231   char sval[5];
232   sprintf(sval, "%d", t);
233   stream.write(sval, strlen(sval));
234 }
235 
serialize(bool t,const char * tag)236 void hadoop::OCsvArchive::serialize(bool t, const char* tag)
237 {
238   printCommaUnlessFirst();
239   const char *sval = t ? "T" : "F";
240   stream.write(sval,1);
241 }
242 
serialize(int32_t t,const char * tag)243 void hadoop::OCsvArchive::serialize(int32_t t, const char* tag)
244 {
245   printCommaUnlessFirst();
246   char sval[128];
247   sprintf(sval, "%d", t);
248   stream.write(sval, strlen(sval));
249 }
250 
serialize(int64_t t,const char * tag)251 void hadoop::OCsvArchive::serialize(int64_t t, const char* tag)
252 {
253   printCommaUnlessFirst();
254   char sval[128];
255   sprintf(sval, "%lld", t);
256   stream.write(sval, strlen(sval));
257 }
258 
serialize(float t,const char * tag)259 void hadoop::OCsvArchive::serialize(float t, const char* tag)
260 {
261   printCommaUnlessFirst();
262   char sval[128];
263   sprintf(sval, "%f", t);
264   stream.write(sval, strlen(sval));
265 }
266 
serialize(double t,const char * tag)267 void hadoop::OCsvArchive::serialize(double t, const char* tag)
268 {
269   printCommaUnlessFirst();
270   char sval[128];
271   sprintf(sval, "%lf", t);
272   stream.write(sval, strlen(sval));
273 }
274 
serialize(const std::string & t,const char * tag)275 void hadoop::OCsvArchive::serialize(const std::string& t, const char* tag)
276 {
277   printCommaUnlessFirst();
278   stream.write("'",1);
279   int len = t.length();
280   for (int idx = 0; idx < len; idx++) {
281     char c = t[idx];
282     switch(c) {
283       case '\0':
284         stream.write("%00",3);
285         break;
286       case 0x0A:
287         stream.write("%0A",3);
288         break;
289       case 0x0D:
290         stream.write("%0D",3);
291         break;
292       case 0x25:
293         stream.write("%25",3);
294         break;
295       case 0x2C:
296         stream.write("%2C",3);
297         break;
298       case 0x7D:
299         stream.write("%7D",3);
300         break;
301       default:
302         stream.write(&c,1);
303         break;
304     }
305   }
306 }
307 
serialize(const std::string & t,size_t len,const char * tag)308 void hadoop::OCsvArchive::serialize(const std::string& t, size_t len, const char* tag)
309 {
310   printCommaUnlessFirst();
311   stream.write("#",1);
312   for(size_t idx = 0; idx < len; idx++) {
313     uint8_t b = t[idx];
314     char sval[3];
315     sprintf(sval,"%2x",b);
316     stream.write(sval, 2);
317   }
318 }
319 
startRecord(const Record & s,const char * tag)320 void hadoop::OCsvArchive::startRecord(const Record& s, const char* tag)
321 {
322   printCommaUnlessFirst();
323   if (tag != NULL && strlen(tag) != 0) {
324     stream.write("s{",2);
325   }
326   isFirst = true;
327 }
328 
endRecord(const Record & s,const char * tag)329 void hadoop::OCsvArchive::endRecord(const Record& s, const char* tag)
330 {
331   if (tag == NULL || strlen(tag) == 0) {
332     stream.write("\n",1);
333     isFirst = true;
334   } else {
335     stream.write("}",1);
336     isFirst = false;
337   }
338 }
339 
startVector(size_t len,const char * tag)340 void hadoop::OCsvArchive::startVector(size_t len, const char* tag)
341 {
342   printCommaUnlessFirst();
343   stream.write("v{",2);
344   isFirst = true;
345 }
346 
endVector(size_t len,const char * tag)347 void hadoop::OCsvArchive::endVector(size_t len, const char* tag)
348 {
349   stream.write("}",1);
350   isFirst = false;
351 }
352 
startMap(size_t len,const char * tag)353 void hadoop::OCsvArchive::startMap(size_t len, const char* tag)
354 {
355   printCommaUnlessFirst();
356   stream.write("m{",2);
357   isFirst = true;
358 }
359 
endMap(size_t len,const char * tag)360 void hadoop::OCsvArchive::endMap(size_t len, const char* tag)
361 {
362   stream.write("}",1);
363   isFirst = false;
364 }
365 
~OCsvArchive()366 hadoop::OCsvArchive::~OCsvArchive()
367 {
368 }
369