1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 2006-2011 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                                                                      *
19 ***********************************************************************/
20 #pragma prototyped
21 
22 /*
23  * sort uniq summary discipline
24  */
25 
26 static const char usage[] =
27 "[-1lp0s5P?\n@(#)$Id: glean (AT&T Research) 2007-04-20 $\n]"
28 USAGE_LICENSE
29 "[+PLUGIN?glean - glean minima and/or maxima from record stream]"
30 "[+DESCRIPTION?The \bglean\b \bsort\b(1) discipline gleans minima "
31     "and maxima from a record stream. Each record is categorized by the main "
32     "sort key. If there is no main sort key then all records are in one "
33     "category. If the \bmin\b key sorts less than the current category "
34     "minimum or if the \bmax\b key sorts greater than the category maximum "
35     "then the category minimum or maximum is updated and the record is "
36     "written to the standard output. \bmax=\b\ak1\a,\bmax=\b\ak2\a sets one "
37     "maximum using the two sort keys \ak1\a and \ak2\a. "
38     "\bmax:=\b\ak1\a,\bmax:=\b\ak2\a sets two maxima, the first using sort "
39     "key \ak1\a and the second using sort key \ak2\a.]"
40 "[+?Note that \b,\b is the plugin option separator. Literal \b,\b must "
41     "either be quoted or escaped \aafter\a normal shell expansion.]"
42 "[a:absolute?List only the absolute minimum/maximum records for each "
43     "category.]"
44 "[c:count?Precede each output record with its category index, "
45     "category count, and total record count.]"
46 "[f:flush?Flush each line written to the standard output.]"
47 "[m:min?Mimima \bsort\b(1) key specification.]:[sort-key]"
48 "[M:max?Maxima \bsort\b(1) key specification.]:[sort-key]"
49 "[+EXAMPLES]"
50     "{"
51         "[+sort -t, -k1,1 -k2,2n -lglean,flush,min='\"3,3n\"',min='\"4,4\"' old.dat -?Sorts "
52             "on the \b,\b separated fields 1 (string) and 2 (numeric) and "
53             "lists the current minimal records per field 3 (numeric) and "
54             "field 4 (string), for each value of the catenation of fields 1 and 2.]"
55     "}"
56 "[+SEE ALSO?\bsort\b(1)]"
57 "\n\n--library=glean[,option[=value]...]\n\n"
58 ;
59 
60 #include <ast.h>
61 #include <debug.h>
62 #include <dt.h>
63 #include <error.h>
64 #include <recsort.h>
65 #include <vmalloc.h>
66 
67 typedef struct Data_s
68 {
69 	void*		data;
70 	size_t		size;
71 	size_t		len;
72 } Data_t;
73 
74 typedef struct Category_s
75 {
76 	Dtlink_t	link;
77 	Sfulong_t	count;
78 	unsigned int	index;
79 	Data_t		key;
80 	Data_t		lim[1];
81 } Category_t;
82 
83 struct Field_s; typedef struct Field_s Field_t;
84 
85 struct Field_s
86 {
87 	Field_t*	next;
88 	Rskey_t*	lim;
89 	int		mm;
90 	int		index;
91 	Category_t*	category;
92 	Sfulong_t	count;
93 	Sfulong_t	total;
94 	Data_t		absolute;
95 };
96 
97 typedef struct State_s
98 {
99 	Rsdisc_t	rsdisc;
100 	Dtdisc_t	dtdisc;
101 	Rskeydisc_t	kydisc;
102 	Dt_t*		categories;
103 	unsigned int	index;
104 	Field_t*	field;
105 	unsigned int	fields;
106 	Data_t		key;
107 	Vmalloc_t*	vm;
108 	int		absolute;
109 	int		count;
110 	Category_t*	all;
111 	Sfulong_t	total;
112 } State_t;
113 
114 static int
save(Vmalloc_t * vm,register Data_t * p,void * data,size_t len)115 save(Vmalloc_t* vm, register Data_t* p, void* data, size_t len)
116 {
117 	if (p->size < len)
118 	{
119 		p->size = roundof(len, 256);
120 		if (!(p->data = vmnewof(vm, p->data, char, p->size, 0)))
121 		{
122 			error(ERROR_SYSTEM|2, "out of space [save]");
123 			return -1;
124 		}
125 	}
126 	p->len = len;
127 	if (data)
128 		memcpy(p->data, data, len);
129 	return 0;
130 }
131 
132 static int
gleancmp(Dt_t * dt,void * a,void * b,Dtdisc_t * disc)133 gleancmp(Dt_t* dt, void* a, void* b, Dtdisc_t* disc)
134 {
135 	Category_t*	x = (Category_t*)a;
136 	Category_t*	y = (Category_t*)b;
137 
138 	message((-4, "gleancmp a:%d:%-.*s: b:%d:%-.*s:", x->key.len, x->key.len, x->key.data, y->key.len, y->key.len, y->key.data));
139 	if (x->key.len < y->key.len)
140 		return -1;
141 	if (x->key.len > y->key.len)
142 		return 1;
143 	return memcmp(x->key.data, y->key.data, x->key.len);
144 }
145 
146 static char*
fmtdata(void * data,size_t size)147 fmtdata(void* data, size_t size)
148 {
149 	size_t	i;
150 	char*	b;
151 	char*	s;
152 
153 	s = b = fmtbuf(2 * size + 1);
154 	for (i = 0; i < size; i++)
155 		s += sfsprintf(s, 3, "%02x", ((unsigned char*)data)[i]);
156 	return b;
157 }
158 
159 static int
glean(Rs_t * rs,int op,Void_t * data,Void_t * arg,Rsdisc_t * disc)160 glean(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
161 {
162 	State_t*		state = (State_t*)disc;
163 	register Rsobj_t*	r;
164 	register Category_t*	p;
165 	register Field_t*	f;
166 	Category_t		x;
167 	ssize_t			k;
168 	int			n;
169 	int			m;
170 	Data_t			t;
171 
172 	switch (op)
173 	{
174 	case RS_POP:
175 		if (state->absolute)
176 			for (f = state->field; f; f = f->next)
177 			{
178 				if (state->count)
179 					sfprintf(sfstdout, "%u/%I*u/%I*u ", f->category->index, sizeof(f->count), f->count, sizeof(f->total), f->total);
180 				sfwrite(sfstdout, f->absolute.data, f->absolute.len);
181 			}
182 		vmclose(state->vm);
183 		return 0;
184 	case RS_READ:
185 		r = (Rsobj_t*)data;
186 		x.key.data = r->key;
187 		x.key.len = r->keylen;
188 		if (state->categories && !(p = (Category_t*)dtsearch(state->categories, &x)) || !state->categories && !(p = state->all))
189 		{
190 			if (!(p = vmnewof(state->vm, 0, Category_t, 1, (state->fields - 1) * sizeof(Data_t) + r->keylen)))
191 			{
192 				error(ERROR_SYSTEM|2, "out of space [category]");
193 				return -1;
194 			}
195 			p->index = ++state->index;
196 			p->key.len = r->keylen;
197 			p->key.data = (char*)(p + 1) + (state->fields - 1) * sizeof(Data_t);
198 			memcpy(p->key.data, r->key, r->keylen);
199 			if (state->categories)
200 				dtinsert(state->categories, p);
201 			else
202 				state->all = p;
203 		}
204 		state->total++;
205 		p->count++;
206 		message((-2, "glean record p=%p %I*u/%I*u key='%-.*s' r:%d:%-.*s: '%-.*s'", p, sizeof(p->count), p->count, sizeof(state->total), state->total, r->keylen, r->key, x.key.len, x.key.len, x.key.data, r->datalen && r->data[r->datalen - 1] == '\n' ? r->datalen - 1 : r->datalen, r->data));
207 		m = 0;
208 		for (f = state->field; f; f = f->next)
209 		{
210 			if (f->lim->disc->defkeyf)
211 			{
212 				if ((k = f->lim->disc->keylen) <= 0)
213 					k = 4;
214 				k *= r->datalen;
215 				if (save(state->vm, &state->key, 0, k))
216 					return -1;
217 				if ((k = (*f->lim->disc->defkeyf)(NiL, r->data, r->datalen, state->key.data, state->key.size, f->lim->disc)) < 0)
218 					return -1;
219 				t.len = state->key.len = k;
220 				t.data = state->key.data;
221 			}
222 			else
223 			{
224 				t.data = r->data + f->lim->disc->key;
225 				if ((k = f->lim->disc->keylen) <= 0)
226 					k += r->datalen - f->lim->disc->key;
227 				t.len = k;
228 			}
229 			message((-1, "glean [%d] %c a:%d:%s:", f->index, f->mm, t.len, fmtdata(t.data, t.len)));
230 			if (!p->lim[f->index].data)
231 				n = f->mm == 'm' ? -1 : 1;
232 			else
233 			{
234 				message((-1, "glean [%d] %c b:%d:%s:", f->index, f->mm, p->lim[f->index].len, fmtdata(p->lim[f->index].data, p->lim[f->index].len)));
235 				if ((k = t.len) < p->lim[f->index].len)
236 					k = p->lim[f->index].len;
237 				if (!(n = memcmp(t.data, p->lim[f->index].data, k)))
238 					n = t.len - p->lim[f->index].len;
239 			}
240 			if (f->mm == 'm' ? (n < 0) : (n > 0))
241 			{
242 				if (f->lim->disc->defkeyf)
243 				{
244 					t = state->key;
245 					state->key = p->lim[f->index];
246 					p->lim[f->index] = t;
247 				}
248 				else if (save(state->vm, &p->lim[f->index], t.data, t.len))
249 					return -1;
250 				if (!state->absolute)
251 					m = 1;
252 				else if (save(state->vm, &f->absolute, r->data, r->datalen))
253 					return -1;
254 				else
255 				{
256 					f->category = p;
257 					f->count = p->count;
258 					f->total = state->total;
259 				}
260 			}
261 		}
262 		if (m)
263 		{
264 			if (state->count)
265 				sfprintf(sfstdout, "%u/%I*u/%I*u ", p->index, sizeof(p->count), p->count, sizeof(state->total), state->total);
266 			sfwrite(sfstdout, r->data, r->datalen);
267 		}
268 		return RS_DELETE;
269 	}
270 	return -1;
271 }
272 
273 Rsdisc_t*
rs_disc(Rskey_t * key,const char * options)274 rs_disc(Rskey_t* key, const char* options)
275 {
276 	register State_t*	state;
277 	register Field_t*	f;
278 	Vmalloc_t*		vm;
279 	int			i;
280 
281 	if (!(vm = vmopen(Vmdcheap, Vmbest, 0)) || !(state = vmnewof(vm, 0, State_t, 1, 0)))
282 		error(ERROR_SYSTEM|3, "out of space [state]");
283 	state->vm = vm;
284 	key->type &= ~RS_DATA;
285 	state->dtdisc.link = offsetof(Category_t, link);
286 	state->dtdisc.comparf = gleancmp;
287 	state->kydisc.version = RSKEY_VERSION;
288 	state->kydisc.errorf = errorf;
289 	state->rsdisc.eventf = glean;
290 	state->rsdisc.events = RS_READ|RS_POP;
291 	if ((key->keydisc->flags & RSKEY_KEYS) && !(state->categories = dtnew(vm, &state->dtdisc, Dtoset)))
292 		error(ERROR_SYSTEM|3, "out of space [dictionary]");
293 	if (options)
294 	{
295 		for (;;)
296 		{
297 			switch (i = optstr(options, usage))
298 			{
299 			case 0:
300 				break;
301 			case 'a':
302 				state->absolute = opt_info.num;
303 				continue;
304 			case 'c':
305 				state->count = opt_info.num;
306 				continue;
307 			case 'f':
308 				sfset(sfstdout, SF_LINE, 1);
309 				continue;
310 			case 'm':
311 			case 'M':
312 				if (opt_info.assignment == ':' || !(f = state->field) || f->mm != i)
313 				{
314 					if (!(f = vmnewof(vm, 0, Field_t, 1, 0)) || !(f->lim = rskeyopen(&state->kydisc, NiL)))
315 						error(ERROR_SYSTEM|3, "out of space");
316 					strcpy((char*)f->lim->tab, (char*)key->tab);
317 					f->lim->type = key->type;
318 					f->mm = i;
319 					f->index = state->fields++;
320 					f->next = state->field;
321 					state->field = f;
322 				}
323 				if (rskey(f->lim, opt_info.arg, 0))
324 					goto drop;
325 				continue;
326 			case '?':
327 				error(ERROR_USAGE|4, "%s", opt_info.arg);
328 				goto drop;
329 			case ':':
330 				error(2, "%s", opt_info.arg);
331 				goto drop;
332 			}
333 			break;
334 		}
335 		for (f = state->field; f; f = f->next)
336 			if (rskeyinit(f->lim))
337 				goto drop;
338 	}
339 	return &state->rsdisc;
340  drop:
341 	for (f = state->field; f; f = f->next)
342 		rskeyclose(f->lim);
343 	vmclose(vm);
344 	return 0;
345 }
346 
347 SORTLIB(glean)
348