1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 2011-2012 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <glenn.s.fowler@gmail.com> *
18 * *
19 ***********************************************************************/
20 #pragma prototyped
21
22 static const char sort_usage[] =
23 "[+PLUGIN?\findex\f]"
24 "[+DESCRIPTION?The sort query writes the sorted input records to the"
25 " standard output. The unsorted record stream is passed to the"
26 " next query, if any. The sort keys are the \afield\a operands,"
27 " or the raw record data if there are no operands.]"
28 "[c:count?Prepend an integer count field of the number of records that "
29 "compare equal. Records with count less that \athreshhold\a are "
30 "omitted.]#?[threshold:=1]"
31 "[r:reverse|invert?Reverse the sense of comparisons.]"
32 "[u:unique?Keep only the first of multiple records that compare equal on "
33 "all keys.]"
34 "\n"
35 "\n[ field ... ]\n"
36 "\n"
37 "[+CAVEATS?Currently all data is sorted in memory -- spillover to "
38 "temporary files not implemented yet.]"
39 ;
40
41 #include <dsslib.h>
42 #include <recsort.h>
43 #include <stk.h>
44
45 struct State_s; typedef struct State_s State_t;
46
47 struct State_s
48 {
49 Rsdisc_t sortdisc;
50 Rsdisc_t uniqdisc;
51 Rskeydisc_t keydisc;
52 Rskey_t* sortkey;
53 Rskey_t* uniqkey;
54 Rs_t* sort;
55 Rs_t* uniq;
56 Cx_t* cx;
57 Sfio_t* op;
58 Sfio_t* sortstack;
59 Sfio_t* uniqstack;
60 Sfio_t* tmp;
61 char* sortbase;
62 char* uniqbase;
63 void* data;
64 Dssfile_t* file;
65 Vmalloc_t* rm;
66 Vmalloc_t* vm;
67 size_t count;
68 };
69
70 extern Dsslib_t dss_lib_sort;
71
72 static ssize_t
key(Rs_t * rs,unsigned char * data,size_t datasize,unsigned char * key,size_t keysize,Rsdisc_t * disc)73 key(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
74 {
75 State_t* state = (State_t*)disc;
76 Rskeyfield_t* field;
77 Cxoperand_t r;
78 unsigned char* k;
79 unsigned char* e;
80
81 k = key;
82 e = k + keysize;
83 for (field = state->sortkey->head; field; field = field->next)
84 {
85 if (cxcast(state->cx, &r, (Cxvariable_t*)field->user, state->cx->state->type_string, state->data, NiL))
86 return -1;
87 k += field->coder(state->sortkey, field, (unsigned char*)r.value.string.data, r.value.string.size, k, e);
88 }
89 return k - key;
90 }
91
92 static ssize_t
rev(Rs_t * rs,unsigned char * data,size_t datasize,unsigned char * key,size_t keysize,Rsdisc_t * disc)93 rev(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
94 {
95 State_t* state = (State_t*)disc;
96
97 return state->sortkey->head->coder(state->sortkey, state->sortkey->head, data, datasize, key, key + keysize);
98 }
99
100 static int
count(Rs_t * rs,int op,void * data,void * arg,Rsdisc_t * disc)101 count(Rs_t* rs, int op, void* data, void* arg, Rsdisc_t* disc)
102 {
103 State_t* state = (State_t*)disc;
104 Rsobj_t* r;
105 Rsobj_t* q;
106 char* s;
107 ssize_t n;
108
109 switch (op)
110 {
111 case RS_POP:
112 break;
113 case RS_WRITE:
114 r = (Rsobj_t*)data;
115 n = 1;
116 for (q = r->equal; q; q = q->right)
117 n++;
118 if (n >= state->count)
119 {
120 n = sfprintf(state->uniqstack, "%I*u %-.*s", sizeof(n), n, r->datalen, r->data);
121 s = stkfreeze(state->uniqstack, 0);
122 if (rsprocess(state->uniq, s, -n) <= 0)
123 {
124 if (state->cx->disc->errorf)
125 (*state->cx->disc->errorf)(state->cx, disc, ERROR_SYSTEM|2, "uniq record process error");
126 return -1;
127 }
128 }
129 return RS_DELETE;
130 default:
131 return -1;
132 }
133 return 0;
134 }
135
136 static int
sort_beg(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)137 sort_beg(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
138 {
139 char** argv = (char**)data;
140 int errors = error_info.errors;
141 int n;
142 int uniq;
143 char* s;
144 char* t;
145 char* num;
146 State_t* state;
147 Cxvariable_t* variable;
148 Vmalloc_t* vm;
149 char opt[2];
150
151 if (!(vm = vmopen(Vmdcheap, Vmlast, 0)) || !(state = vmnewof(vm, 0, State_t, 1, 0)))
152 {
153 if (vm)
154 vmclose(vm);
155 if (disc->errorf)
156 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "out of space");
157 return -1;
158 }
159 state->vm = vm;
160 state->keydisc.version = RSKEY_VERSION;
161 state->keydisc.errorf = disc->errorf;
162 if (!(state->sortkey = rskeyopen(&state->keydisc, &state->sortdisc)))
163 goto bad;
164 if (!(state->sort = rsnew(state->sortkey->disc)))
165 goto bad;
166 if (!(state->sortstack = stkopen(0)))
167 goto bad;
168 if (!(state->tmp = sfstropen()))
169 goto bad;
170 state->sortbase = stkptr(state->sortstack, 0);
171 if (!(state->file = dssfopen(DSS(cx), "-", state->sortstack, DSS_FILE_WRITE, 0)))
172 goto bad;
173 sfprintf(cx->buf, "%s%s", strchr(dss_lib_sort.description, '['), sort_usage);
174 s = sfstruse(cx->buf);
175 uniq = 0;
176 for (;;)
177 {
178 switch (optget(argv, s))
179 {
180 case 0:
181 break;
182 case 'c':
183 state->count = (size_t)opt_info.number;
184 continue;
185 case 'u':
186 uniq = 1;
187 continue;
188 case '?':
189 if (disc->errorf)
190 {
191 (*disc->errorf)(cx, disc, ERROR_USAGE|4, "%s", opt_info.arg);
192 }
193 else
194 goto bad;
195 continue;
196 case ':':
197 if (disc->errorf)
198 (*disc->errorf)(cx, disc, 2, "%s", opt_info.arg);
199 else
200 goto bad;
201 continue;
202 default:
203 opt[0] = opt_info.option[1];
204 opt[1] = 0;
205 if (rskeyopt(state->sortkey, opt, 1))
206 goto bad;
207 continue;
208 }
209 break;
210 }
211 if (error_info.errors > errors)
212 goto bad;
213 argv += opt_info.index;
214 n = 0;
215 num = state->sortkey->head && state->sortkey->head->rflag ? "nr" : "n";
216 while (s = *argv++)
217 {
218 if (t = strchr(s, '-'))
219 {
220 sfwrite(cx->buf, s, t - s);
221 s = sfstruse(cx->buf);
222 t++;
223 }
224 else
225 t = 0;
226 if (!(variable = cxvariable(cx, s, NiL, disc)))
227 goto bad;
228 if (rskey(state->sortkey, t ? t : cxisnumber(variable->type) ? num : "", 0))
229 goto bad;
230 state->sortkey->tail->user = variable;
231 n = 1;
232 }
233 if (uniq)
234 {
235 state->sortkey->type &= ~RS_DATA;
236 state->sortkey->type |= RS_UNIQ;
237 }
238 if (state->count)
239 {
240 state->sortdisc.events |= RS_WRITE;
241 state->sortdisc.eventf = count;
242 if (!(state->uniqstack = stkopen(0)))
243 goto bad;
244 state->uniqbase = stkptr(state->uniqstack, 0);
245 if (!(state->uniqkey = rskeyopen(&state->keydisc, &state->uniqdisc)))
246 goto bad;
247 if (!(state->uniq = rsnew(state->uniqkey->disc)))
248 goto bad;
249 if (rskey(state->uniqkey, "1n", 0))
250 goto bad;
251 if (rskeyinit(state->uniqkey))
252 {
253 if (disc->errorf)
254 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
255 goto bad;
256 }
257 if (rsinit(state->uniq, state->uniqkey->meth, state->uniqkey->procsize, state->uniqkey->type, state->uniqkey))
258 {
259 if (disc->errorf)
260 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq initialization error");
261 goto bad;
262 }
263 }
264 if (rskeyinit(state->sortkey))
265 {
266 if (disc->errorf)
267 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
268 goto bad;
269 }
270 if (n)
271 {
272 state->sortdisc.defkeyf = key;
273 state->sortdisc.key = 1;
274 }
275 else if (state->sortkey->head->rflag)
276 {
277 state->sortdisc.defkeyf = rev;
278 state->sortdisc.key = 1;
279 }
280 if (rsinit(state->sort, state->sortkey->meth, state->sortkey->procsize, state->sortkey->type, state->sortkey))
281 {
282 if (disc->errorf)
283 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort initialization error");
284 goto bad;
285 }
286 state->cx = cx;
287 state->op = expr->op;
288 expr->data = state;
289 return 0;
290 bad:
291 if (state->sort)
292 rsclose(state->sort);
293 if (state->uniq)
294 rsclose(state->uniq);
295 if (state->sortkey)
296 rskeyclose(state->sortkey);
297 if (state->uniqkey)
298 rskeyclose(state->uniqkey);
299 if (state->file)
300 dssfclose(state->file);
301 else if (state->sortstack)
302 stkclose(state->sortstack);
303 if (state->uniqstack)
304 stkclose(state->uniqstack);
305 if (state->tmp)
306 sfstrclose(state->tmp);
307 vmclose(state->vm);
308 return -1;
309 }
310
311 static int
sort_act(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)312 sort_act(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
313 {
314 State_t* state = (State_t*)expr->data;
315 Dssrecord_t* record = (Dssrecord_t*)data;
316 char* s;
317 ssize_t n;
318
319 if (dssfwrite(state->file, record))
320 return -1;
321 n = stktell(state->file->io);
322 s = stkfreeze(state->file->io, 0);
323 state->data = data;
324 if (rsprocess(state->sort, s, -n) <= 0)
325 {
326 if (disc->errorf)
327 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort record process error");
328 return -1;
329 }
330 return 0;
331 }
332
333 static int
sort_end(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)334 sort_end(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
335 {
336 State_t* state = (State_t*)expr->data;
337 int r;
338
339 r = 0;
340 if (rswrite(state->sort, expr->op, RS_OTEXT))
341 {
342 if (disc->errorf)
343 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort output error");
344 r = -1;
345 }
346 while (rsdisc(state->sort, NiL, RS_POP));
347 if (rsclose(state->sort))
348 {
349 if (disc->errorf)
350 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort close error");
351 r = -1;
352 }
353 if (rskeyclose(state->sortkey))
354 {
355 if (disc->errorf)
356 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
357 r = -1;
358 }
359 if (state->uniq)
360 {
361 if (rswrite(state->uniq, expr->op, RS_OTEXT))
362 {
363 if (disc->errorf)
364 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq output error");
365 r = -1;
366 }
367 while (rsdisc(state->uniq, NiL, RS_POP));
368 if (rsclose(state->uniq))
369 {
370 if (disc->errorf)
371 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq close error");
372 r = -1;
373 }
374 if (rskeyclose(state->uniqkey))
375 {
376 if (disc->errorf)
377 (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
378 r = -1;
379 }
380 stkclose(state->uniqstack);
381 }
382 dssfclose(state->file);
383 sfstrclose(state->tmp);
384 vmclose(state->vm);
385 return r;
386 }
387
388 static Cxquery_t queries[] =
389 {
390 {
391 "sort",
392 "sort records to the standard output",
393 CXH,
394 sort_beg,
395 0,
396 sort_act,
397 sort_end
398 },
399 {0}
400 };
401
402 Dsslib_t dss_lib_sort =
403 {
404 "sort",
405 "sort query"
406 "[-1lms5P?\n@(#)$Id: dss sort query (AT&T Research) 2011-10-18 $\n]"
407 USAGE_LICENSE,
408 CXH,
409 0,
410 0,
411 0,
412 0,
413 0,
414 0,
415 &queries[0]
416 };
417