1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18
19 #include "csvarchive.hh"
20 #include <stdlib.h>
21
22 using namespace hadoop;
23
readUptoTerminator(PushBackInStream & stream)24 static std::string readUptoTerminator(PushBackInStream& stream)
25 {
26 std::string s;
27 while (1) {
28 char c;
29 if (1 != stream.read(&c, 1)) {
30 throw new IOException("Error in deserialization.");
31 }
32 if (c == ',' || c == '\n' || c == '}') {
33 if (c != ',') {
34 stream.pushBack(c);
35 }
36 break;
37 }
38 s.push_back(c);
39 }
40 return s;
41 }
42
deserialize(int8_t & t,const char * tag)43 void hadoop::ICsvArchive::deserialize(int8_t& t, const char* tag)
44 {
45 std::string s = readUptoTerminator(stream);
46 t = (int8_t) strtol(s.c_str(), NULL, 10);
47 }
48
deserialize(bool & t,const char * tag)49 void hadoop::ICsvArchive::deserialize(bool& t, const char* tag)
50 {
51 std::string s = readUptoTerminator(stream);
52 t = (s == "T") ? true : false;
53 }
54
deserialize(int32_t & t,const char * tag)55 void hadoop::ICsvArchive::deserialize(int32_t& t, const char* tag)
56 {
57 std::string s = readUptoTerminator(stream);
58 t = strtol(s.c_str(), NULL, 10);
59 }
60
deserialize(int64_t & t,const char * tag)61 void hadoop::ICsvArchive::deserialize(int64_t& t, const char* tag)
62 {
63 std::string s = readUptoTerminator(stream);
64 t = strtoll(s.c_str(), NULL, 10);
65 }
66
deserialize(float & t,const char * tag)67 void hadoop::ICsvArchive::deserialize(float& t, const char* tag)
68 {
69 std::string s = readUptoTerminator(stream);
70 t = strtof(s.c_str(), NULL);
71 }
72
deserialize(double & t,const char * tag)73 void hadoop::ICsvArchive::deserialize(double& t, const char* tag)
74 {
75 std::string s = readUptoTerminator(stream);
76 t = strtod(s.c_str(), NULL);
77 }
78
deserialize(std::string & t,const char * tag)79 void hadoop::ICsvArchive::deserialize(std::string& t, const char* tag)
80 {
81 std::string temp = readUptoTerminator(stream);
82 if (temp[0] != '\'') {
83 throw new IOException("Errror deserializing string.");
84 }
85 t.clear();
86 // skip first character, replace escaped characters
87 int len = temp.length();
88 for (int i = 1; i < len; i++) {
89 char c = temp.at(i);
90 if (c == '%') {
91 // since we escape '%', there have to be at least two chars following a '%'
92 char ch1 = temp.at(i+1);
93 char ch2 = temp.at(i+2);
94 i += 2;
95 if (ch1 == '0' && ch2 == '0') {
96 t.append(1, '\0');
97 } else if (ch1 == '0' && ch2 == 'A') {
98 t.append(1, '\n');
99 } else if (ch1 == '0' && ch2 == 'D') {
100 t.append(1, '\r');
101 } else if (ch1 == '2' && ch2 == 'C') {
102 t.append(1, ',');
103 } else if (ch1 == '7' && ch2 == 'D') {
104 t.append(1, '}');
105 } else if (ch1 == '2' && ch2 == '5') {
106 t.append(1, '%');
107 } else {
108 throw new IOException("Error deserializing string.");
109 }
110 }
111 else {
112 t.append(1, c);
113 }
114 }
115 }
116
deserialize(std::string & t,size_t & len,const char * tag)117 void hadoop::ICsvArchive::deserialize(std::string& t, size_t& len, const char* tag)
118 {
119 std::string s = readUptoTerminator(stream);
120 if (s[0] != '#') {
121 throw new IOException("Errror deserializing buffer.");
122 }
123 s.erase(0, 1); /// erase first character
124 len = s.length();
125 if (len%2 == 1) { // len is guaranteed to be even
126 throw new IOException("Errror deserializing buffer.");
127 }
128 len = len >> 1;
129 for (size_t idx = 0; idx < len; idx++) {
130 char buf[3];
131 buf[0] = s[2*idx];
132 buf[1] = s[2*idx+1];
133 buf[2] = '\0';
134 int i;
135 if (1 != sscanf(buf, "%2x", &i)) {
136 throw new IOException("Errror deserializing buffer.");
137 }
138 t.push_back((char) i);
139 }
140 len = t.length();
141 }
142
startRecord(Record & s,const char * tag)143 void hadoop::ICsvArchive::startRecord(Record& s, const char* tag)
144 {
145 if (tag != NULL) {
146 char mark[2];
147 if (2 != stream.read(mark, 2)) {
148 throw new IOException("Error deserializing record.");
149 }
150 if (mark[0] != 's' || mark[1] != '{') {
151 throw new IOException("Error deserializing record.");
152 }
153 }
154 }
155
endRecord(Record & s,const char * tag)156 void hadoop::ICsvArchive::endRecord(Record& s, const char* tag)
157 {
158 char mark;
159 if (1 != stream.read(&mark, 1)) {
160 throw new IOException("Error deserializing record.");
161 }
162 if (tag == NULL) {
163 if (mark != '\n') {
164 throw new IOException("Error deserializing record.");
165 }
166 } else if (mark != '}') {
167 throw new IOException("Error deserializing record.");
168 } else {
169 readUptoTerminator(stream);
170 }
171 }
172
startVector(const char * tag)173 Index* hadoop::ICsvArchive::startVector(const char* tag)
174 {
175 char mark[2];
176 if (2 != stream.read(mark, 2)) {
177 throw new IOException("Error deserializing vector.");
178 }
179 if (mark[0] != 'v' || mark[1] != '{') {
180 throw new IOException("Error deserializing vector.");
181 }
182 return new CsvIndex(stream);
183 }
184
endVector(Index * idx,const char * tag)185 void hadoop::ICsvArchive::endVector(Index* idx, const char* tag)
186 {
187 delete idx;
188 char mark;
189 if (1 != stream.read(&mark, 1)) {
190 throw new IOException("Error deserializing vector.");
191 }
192 if (mark != '}') {
193 throw new IOException("Error deserializing vector.");
194 }
195 readUptoTerminator(stream);
196 }
197
startMap(const char * tag)198 Index* hadoop::ICsvArchive::startMap(const char* tag)
199 {
200 char mark[2];
201 if (2 != stream.read(mark, 2)) {
202 throw new IOException("Error deserializing map.");
203 }
204 if (mark[0] != 'm' || mark[1] != '{') {
205 throw new IOException("Error deserializing map.");
206 }
207
208 return new CsvIndex(stream);
209 }
210
endMap(Index * idx,const char * tag)211 void hadoop::ICsvArchive::endMap(Index* idx, const char* tag)
212 {
213 delete idx;
214 char mark;
215 if (1 != stream.read(&mark, 1)) {
216 throw new IOException("Error deserializing map.");
217 }
218 if (mark != '}') {
219 throw new IOException("Error deserializing map.");
220 }
221 readUptoTerminator(stream);
222 }
223
~ICsvArchive()224 hadoop::ICsvArchive::~ICsvArchive()
225 {
226 }
227
serialize(int8_t t,const char * tag)228 void hadoop::OCsvArchive::serialize(int8_t t, const char* tag)
229 {
230 printCommaUnlessFirst();
231 char sval[5];
232 sprintf(sval, "%d", t);
233 stream.write(sval, strlen(sval));
234 }
235
serialize(bool t,const char * tag)236 void hadoop::OCsvArchive::serialize(bool t, const char* tag)
237 {
238 printCommaUnlessFirst();
239 const char *sval = t ? "T" : "F";
240 stream.write(sval,1);
241 }
242
serialize(int32_t t,const char * tag)243 void hadoop::OCsvArchive::serialize(int32_t t, const char* tag)
244 {
245 printCommaUnlessFirst();
246 char sval[128];
247 sprintf(sval, "%d", t);
248 stream.write(sval, strlen(sval));
249 }
250
serialize(int64_t t,const char * tag)251 void hadoop::OCsvArchive::serialize(int64_t t, const char* tag)
252 {
253 printCommaUnlessFirst();
254 char sval[128];
255 sprintf(sval, "%lld", t);
256 stream.write(sval, strlen(sval));
257 }
258
serialize(float t,const char * tag)259 void hadoop::OCsvArchive::serialize(float t, const char* tag)
260 {
261 printCommaUnlessFirst();
262 char sval[128];
263 sprintf(sval, "%f", t);
264 stream.write(sval, strlen(sval));
265 }
266
serialize(double t,const char * tag)267 void hadoop::OCsvArchive::serialize(double t, const char* tag)
268 {
269 printCommaUnlessFirst();
270 char sval[128];
271 sprintf(sval, "%lf", t);
272 stream.write(sval, strlen(sval));
273 }
274
serialize(const std::string & t,const char * tag)275 void hadoop::OCsvArchive::serialize(const std::string& t, const char* tag)
276 {
277 printCommaUnlessFirst();
278 stream.write("'",1);
279 int len = t.length();
280 for (int idx = 0; idx < len; idx++) {
281 char c = t[idx];
282 switch(c) {
283 case '\0':
284 stream.write("%00",3);
285 break;
286 case 0x0A:
287 stream.write("%0A",3);
288 break;
289 case 0x0D:
290 stream.write("%0D",3);
291 break;
292 case 0x25:
293 stream.write("%25",3);
294 break;
295 case 0x2C:
296 stream.write("%2C",3);
297 break;
298 case 0x7D:
299 stream.write("%7D",3);
300 break;
301 default:
302 stream.write(&c,1);
303 break;
304 }
305 }
306 }
307
serialize(const std::string & t,size_t len,const char * tag)308 void hadoop::OCsvArchive::serialize(const std::string& t, size_t len, const char* tag)
309 {
310 printCommaUnlessFirst();
311 stream.write("#",1);
312 for(size_t idx = 0; idx < len; idx++) {
313 uint8_t b = t[idx];
314 char sval[3];
315 sprintf(sval,"%2x",b);
316 stream.write(sval, 2);
317 }
318 }
319
startRecord(const Record & s,const char * tag)320 void hadoop::OCsvArchive::startRecord(const Record& s, const char* tag)
321 {
322 printCommaUnlessFirst();
323 if (tag != NULL && strlen(tag) != 0) {
324 stream.write("s{",2);
325 }
326 isFirst = true;
327 }
328
endRecord(const Record & s,const char * tag)329 void hadoop::OCsvArchive::endRecord(const Record& s, const char* tag)
330 {
331 if (tag == NULL || strlen(tag) == 0) {
332 stream.write("\n",1);
333 isFirst = true;
334 } else {
335 stream.write("}",1);
336 isFirst = false;
337 }
338 }
339
startVector(size_t len,const char * tag)340 void hadoop::OCsvArchive::startVector(size_t len, const char* tag)
341 {
342 printCommaUnlessFirst();
343 stream.write("v{",2);
344 isFirst = true;
345 }
346
endVector(size_t len,const char * tag)347 void hadoop::OCsvArchive::endVector(size_t len, const char* tag)
348 {
349 stream.write("}",1);
350 isFirst = false;
351 }
352
startMap(size_t len,const char * tag)353 void hadoop::OCsvArchive::startMap(size_t len, const char* tag)
354 {
355 printCommaUnlessFirst();
356 stream.write("m{",2);
357 isFirst = true;
358 }
359
endMap(size_t len,const char * tag)360 void hadoop::OCsvArchive::endMap(size_t len, const char* tag)
361 {
362 stream.write("}",1);
363 isFirst = false;
364 }
365
~OCsvArchive()366 hadoop::OCsvArchive::~OCsvArchive()
367 {
368 }
369