1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 2011-2012 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 
22 static const char sort_usage[] =
23 "[+PLUGIN?\findex\f]"
24 "[+DESCRIPTION?The sort query writes the sorted input records to the"
25 "	standard output. The unsorted record stream is passed to the"
26 "	next query, if any. The sort keys are the \afield\a operands,"
27 "	or the raw record data if there are no operands.]"
28 "[c:count?Prepend an integer count field of the number of records that "
29     "compare equal. Records with count less that \athreshhold\a are "
30     "omitted.]#?[threshold:=1]"
31 "[r:reverse|invert?Reverse the sense of comparisons.]"
32 "[u:unique?Keep only the first of multiple records that compare equal on "
33     "all keys.]"
34 "\n"
35 "\n[ field ... ]\n"
36 "\n"
37 "[+CAVEATS?Currently all data is sorted in memory -- spillover to "
38     "temporary files not implemented yet.]"
39 ;
40 
41 #include <dsslib.h>
42 #include <recsort.h>
43 #include <stk.h>
44 
45 struct State_s; typedef struct State_s State_t;
46 
47 struct State_s
48 {
49 	Rsdisc_t	sortdisc;
50 	Rsdisc_t	uniqdisc;
51 	Rskeydisc_t	keydisc;
52 	Rskey_t*	sortkey;
53 	Rskey_t*	uniqkey;
54 	Rs_t*		sort;
55 	Rs_t*		uniq;
56 	Cx_t*		cx;
57 	Sfio_t*		op;
58 	Sfio_t*		sortstack;
59 	Sfio_t*		uniqstack;
60 	Sfio_t*		tmp;
61 	char*		sortbase;
62 	char*		uniqbase;
63 	void*		data;
64 	Dssfile_t*	file;
65 	Vmalloc_t*	rm;
66 	Vmalloc_t*	vm;
67 	size_t		count;
68 };
69 
70 extern Dsslib_t	dss_lib_sort;
71 
72 static ssize_t
key(Rs_t * rs,unsigned char * data,size_t datasize,unsigned char * key,size_t keysize,Rsdisc_t * disc)73 key(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
74 {
75 	State_t*	state = (State_t*)disc;
76 	Rskeyfield_t*	field;
77 	Cxoperand_t	r;
78 	unsigned char*	k;
79 	unsigned char*	e;
80 
81 	k = key;
82 	e = k + keysize;
83 	for (field = state->sortkey->head; field; field = field->next)
84 	{
85 		if (cxcast(state->cx, &r, (Cxvariable_t*)field->user, state->cx->state->type_string, state->data, NiL))
86 			return -1;
87 		k += field->coder(state->sortkey, field, (unsigned char*)r.value.string.data, r.value.string.size, k, e);
88 	}
89 	return k - key;
90 }
91 
92 static ssize_t
rev(Rs_t * rs,unsigned char * data,size_t datasize,unsigned char * key,size_t keysize,Rsdisc_t * disc)93 rev(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc)
94 {
95 	State_t*	state = (State_t*)disc;
96 
97 	return state->sortkey->head->coder(state->sortkey, state->sortkey->head, data, datasize, key, key + keysize);
98 }
99 
100 static int
count(Rs_t * rs,int op,void * data,void * arg,Rsdisc_t * disc)101 count(Rs_t* rs, int op, void* data, void* arg, Rsdisc_t* disc)
102 {
103 	State_t*	state = (State_t*)disc;
104 	Rsobj_t*	r;
105 	Rsobj_t*	q;
106 	char*		s;
107 	ssize_t		n;
108 
109 	switch (op)
110 	{
111 	case RS_POP:
112 		break;
113 	case RS_WRITE:
114 		r = (Rsobj_t*)data;
115 		n = 1;
116 		for (q = r->equal; q; q = q->right)
117 			n++;
118 		if (n >= state->count)
119 		{
120 			n = sfprintf(state->uniqstack, "%I*u %-.*s", sizeof(n), n, r->datalen, r->data);
121 			s = stkfreeze(state->uniqstack, 0);
122 			if (rsprocess(state->uniq, s, -n) <= 0)
123 			{
124 				if (state->cx->disc->errorf)
125 					(*state->cx->disc->errorf)(state->cx, disc, ERROR_SYSTEM|2, "uniq record process error");
126 				return -1;
127 			}
128 		}
129 		return RS_DELETE;
130 	default:
131 		return -1;
132 	}
133 	return 0;
134 }
135 
136 static int
sort_beg(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)137 sort_beg(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
138 {
139 	char**		argv = (char**)data;
140 	int		errors = error_info.errors;
141 	int		n;
142 	int		uniq;
143 	char*		s;
144 	char*		t;
145 	char*		num;
146 	State_t*	state;
147 	Cxvariable_t*	variable;
148 	Vmalloc_t*	vm;
149 	char		opt[2];
150 
151 	if (!(vm = vmopen(Vmdcheap, Vmlast, 0)) || !(state = vmnewof(vm, 0, State_t, 1, 0)))
152 	{
153 		if (vm)
154 			vmclose(vm);
155 		if (disc->errorf)
156 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "out of space");
157 		return -1;
158 	}
159 	state->vm = vm;
160 	state->keydisc.version = RSKEY_VERSION;
161 	state->keydisc.errorf = disc->errorf;
162 	if (!(state->sortkey = rskeyopen(&state->keydisc, &state->sortdisc)))
163 		goto bad;
164 	if (!(state->sort = rsnew(state->sortkey->disc)))
165 		goto bad;
166 	if (!(state->sortstack = stkopen(0)))
167 		goto bad;
168 	if (!(state->tmp = sfstropen()))
169 		goto bad;
170 	state->sortbase = stkptr(state->sortstack, 0);
171 	if (!(state->file = dssfopen(DSS(cx), "-", state->sortstack, DSS_FILE_WRITE, 0)))
172 		goto bad;
173 	sfprintf(cx->buf, "%s%s", strchr(dss_lib_sort.description, '['), sort_usage);
174 	s = sfstruse(cx->buf);
175 	uniq = 0;
176 	for (;;)
177 	{
178 		switch (optget(argv, s))
179 		{
180 		case 0:
181 			break;
182 		case 'c':
183 			state->count = (size_t)opt_info.number;
184 			continue;
185 		case 'u':
186 			uniq = 1;
187 			continue;
188 		case '?':
189 			if (disc->errorf)
190 			{
191 				(*disc->errorf)(cx, disc, ERROR_USAGE|4, "%s", opt_info.arg);
192 			}
193 			else
194 				goto bad;
195 			continue;
196 		case ':':
197 			if (disc->errorf)
198 				(*disc->errorf)(cx, disc, 2, "%s", opt_info.arg);
199 			else
200 				goto bad;
201 			continue;
202 		default:
203 			opt[0] = opt_info.option[1];
204 			opt[1] = 0;
205 			if (rskeyopt(state->sortkey, opt, 1))
206 				goto bad;
207 			continue;
208 		}
209 		break;
210 	}
211 	if (error_info.errors > errors)
212 		goto bad;
213 	argv += opt_info.index;
214 	n = 0;
215 	num = state->sortkey->head && state->sortkey->head->rflag ? "nr" : "n";
216 	while (s = *argv++)
217 	{
218 		if (t = strchr(s, '-'))
219 		{
220 			sfwrite(cx->buf, s, t - s);
221 			s = sfstruse(cx->buf);
222 			t++;
223 		}
224 		else
225 			t = 0;
226 		if (!(variable = cxvariable(cx, s, NiL, disc)))
227 			goto bad;
228 		if (rskey(state->sortkey, t ? t : cxisnumber(variable->type) ? num : "", 0))
229 			goto bad;
230 		state->sortkey->tail->user = variable;
231 		n = 1;
232 	}
233 	if (uniq)
234 	{
235 		state->sortkey->type &= ~RS_DATA;
236 		state->sortkey->type |= RS_UNIQ;
237 	}
238 	if (state->count)
239 	{
240 		state->sortdisc.events |= RS_WRITE;
241 		state->sortdisc.eventf = count;
242 		if (!(state->uniqstack = stkopen(0)))
243 			goto bad;
244 		state->uniqbase = stkptr(state->uniqstack, 0);
245 		if (!(state->uniqkey = rskeyopen(&state->keydisc, &state->uniqdisc)))
246 			goto bad;
247 		if (!(state->uniq = rsnew(state->uniqkey->disc)))
248 			goto bad;
249 		if (rskey(state->uniqkey, "1n", 0))
250 			goto bad;
251 		if (rskeyinit(state->uniqkey))
252 		{
253 			if (disc->errorf)
254 				(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
255 			goto bad;
256 		}
257 		if (rsinit(state->uniq, state->uniqkey->meth, state->uniqkey->procsize, state->uniqkey->type, state->uniqkey))
258 		{
259 			if (disc->errorf)
260 				(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq initialization error");
261 			goto bad;
262 		}
263 	}
264 	if (rskeyinit(state->sortkey))
265 	{
266 		if (disc->errorf)
267 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
268 		goto bad;
269 	}
270 	if (n)
271 	{
272 		state->sortdisc.defkeyf = key;
273 		state->sortdisc.key = 1;
274 	}
275 	else if (state->sortkey->head->rflag)
276 	{
277 		state->sortdisc.defkeyf = rev;
278 		state->sortdisc.key = 1;
279 	}
280 	if (rsinit(state->sort, state->sortkey->meth, state->sortkey->procsize, state->sortkey->type, state->sortkey))
281 	{
282 		if (disc->errorf)
283 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort initialization error");
284 		goto bad;
285 	}
286 	state->cx = cx;
287 	state->op = expr->op;
288 	expr->data = state;
289 	return 0;
290  bad:
291 	if (state->sort)
292 		rsclose(state->sort);
293 	if (state->uniq)
294 		rsclose(state->uniq);
295 	if (state->sortkey)
296 		rskeyclose(state->sortkey);
297 	if (state->uniqkey)
298 		rskeyclose(state->uniqkey);
299 	if (state->file)
300 		dssfclose(state->file);
301 	else if (state->sortstack)
302 		stkclose(state->sortstack);
303 	if (state->uniqstack)
304 		stkclose(state->uniqstack);
305 	if (state->tmp)
306 		sfstrclose(state->tmp);
307 	vmclose(state->vm);
308 	return -1;
309 }
310 
311 static int
sort_act(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)312 sort_act(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
313 {
314 	State_t*	state = (State_t*)expr->data;
315 	Dssrecord_t*	record = (Dssrecord_t*)data;
316 	char*		s;
317 	ssize_t		n;
318 
319 	if (dssfwrite(state->file, record))
320 		return -1;
321 	n = stktell(state->file->io);
322 	s = stkfreeze(state->file->io, 0);
323 	state->data = data;
324 	if (rsprocess(state->sort, s, -n) <= 0)
325 	{
326 		if (disc->errorf)
327 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort record process error");
328 		return -1;
329 	}
330 	return 0;
331 }
332 
333 static int
sort_end(Cx_t * cx,Cxexpr_t * expr,void * data,Cxdisc_t * disc)334 sort_end(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc)
335 {
336 	State_t*	state = (State_t*)expr->data;
337 	int		r;
338 
339 	r = 0;
340 	if (rswrite(state->sort, expr->op, RS_OTEXT))
341 	{
342 		if (disc->errorf)
343 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort output error");
344 		r = -1;
345 	}
346 	while (rsdisc(state->sort, NiL, RS_POP));
347 	if (rsclose(state->sort))
348 	{
349 		if (disc->errorf)
350 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort close error");
351 		r = -1;
352 	}
353 	if (rskeyclose(state->sortkey))
354 	{
355 		if (disc->errorf)
356 			(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error");
357 		r = -1;
358 	}
359 	if (state->uniq)
360 	{
361 		if (rswrite(state->uniq, expr->op, RS_OTEXT))
362 		{
363 			if (disc->errorf)
364 				(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq output error");
365 			r = -1;
366 		}
367 		while (rsdisc(state->uniq, NiL, RS_POP));
368 		if (rsclose(state->uniq))
369 		{
370 			if (disc->errorf)
371 				(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq close error");
372 			r = -1;
373 		}
374 		if (rskeyclose(state->uniqkey))
375 		{
376 			if (disc->errorf)
377 				(*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error");
378 			r = -1;
379 		}
380 		stkclose(state->uniqstack);
381 	}
382 	dssfclose(state->file);
383 	sfstrclose(state->tmp);
384 	vmclose(state->vm);
385 	return r;
386 }
387 
388 static Cxquery_t	queries[] =
389 {
390 	{
391 		"sort",
392 		"sort records to the standard output",
393 		CXH,
394 		sort_beg,
395 		0,
396 		sort_act,
397 		sort_end
398 	},
399 	{0}
400 };
401 
402 Dsslib_t		dss_lib_sort =
403 {
404 	"sort",
405 	"sort query"
406 	"[-1lms5P?\n@(#)$Id: dss sort query (AT&T Research) 2011-10-18 $\n]"
407 	USAGE_LICENSE,
408 	CXH,
409 	0,
410 	0,
411 	0,
412 	0,
413 	0,
414 	0,
415 	&queries[0]
416 };
417