/*********************************************************************** * * * This software is part of the ast package * * Copyright (c) 2011-2012 AT&T Intellectual Property * * and is licensed under the * * Eclipse Public License, Version 1.0 * * by AT&T Intellectual Property * * * * A copy of the License is available at * * http://www.eclipse.org/org/documents/epl-v10.html * * (with md5 checksum b35adb5213ca9657e911e9befb180842) * * * * Information and Software Systems Research * * AT&T Research * * Florham Park NJ * * * * Glenn Fowler * * * ***********************************************************************/ #pragma prototyped static const char sort_usage[] = "[+PLUGIN?\findex\f]" "[+DESCRIPTION?The sort query writes the sorted input records to the" " standard output. The unsorted record stream is passed to the" " next query, if any. The sort keys are the \afield\a operands," " or the raw record data if there are no operands.]" "[c:count?Prepend an integer count field of the number of records that " "compare equal. Records with count less that \athreshhold\a are " "omitted.]#?[threshold:=1]" "[r:reverse|invert?Reverse the sense of comparisons.]" "[u:unique?Keep only the first of multiple records that compare equal on " "all keys.]" "\n" "\n[ field ... ]\n" "\n" "[+CAVEATS?Currently all data is sorted in memory -- spillover to " "temporary files not implemented yet.]" ; #include #include #include struct State_s; typedef struct State_s State_t; struct State_s { Rsdisc_t sortdisc; Rsdisc_t uniqdisc; Rskeydisc_t keydisc; Rskey_t* sortkey; Rskey_t* uniqkey; Rs_t* sort; Rs_t* uniq; Cx_t* cx; Sfio_t* op; Sfio_t* sortstack; Sfio_t* uniqstack; Sfio_t* tmp; char* sortbase; char* uniqbase; void* data; Dssfile_t* file; Vmalloc_t* rm; Vmalloc_t* vm; size_t count; }; extern Dsslib_t dss_lib_sort; static ssize_t key(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc) { State_t* state = (State_t*)disc; Rskeyfield_t* field; Cxoperand_t r; unsigned char* k; unsigned char* e; k = key; e = k + keysize; for (field = state->sortkey->head; field; field = field->next) { if (cxcast(state->cx, &r, (Cxvariable_t*)field->user, state->cx->state->type_string, state->data, NiL)) return -1; k += field->coder(state->sortkey, field, (unsigned char*)r.value.string.data, r.value.string.size, k, e); } return k - key; } static ssize_t rev(Rs_t* rs, unsigned char* data, size_t datasize, unsigned char* key, size_t keysize, Rsdisc_t* disc) { State_t* state = (State_t*)disc; return state->sortkey->head->coder(state->sortkey, state->sortkey->head, data, datasize, key, key + keysize); } static int count(Rs_t* rs, int op, void* data, void* arg, Rsdisc_t* disc) { State_t* state = (State_t*)disc; Rsobj_t* r; Rsobj_t* q; char* s; ssize_t n; switch (op) { case RS_POP: break; case RS_WRITE: r = (Rsobj_t*)data; n = 1; for (q = r->equal; q; q = q->right) n++; if (n >= state->count) { n = sfprintf(state->uniqstack, "%I*u %-.*s", sizeof(n), n, r->datalen, r->data); s = stkfreeze(state->uniqstack, 0); if (rsprocess(state->uniq, s, -n) <= 0) { if (state->cx->disc->errorf) (*state->cx->disc->errorf)(state->cx, disc, ERROR_SYSTEM|2, "uniq record process error"); return -1; } } return RS_DELETE; default: return -1; } return 0; } static int sort_beg(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc) { char** argv = (char**)data; int errors = error_info.errors; int n; int uniq; char* s; char* t; char* num; State_t* state; Cxvariable_t* variable; Vmalloc_t* vm; char opt[2]; if (!(vm = vmopen(Vmdcheap, Vmlast, 0)) || !(state = vmnewof(vm, 0, State_t, 1, 0))) { if (vm) vmclose(vm); if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "out of space"); return -1; } state->vm = vm; state->keydisc.version = RSKEY_VERSION; state->keydisc.errorf = disc->errorf; if (!(state->sortkey = rskeyopen(&state->keydisc, &state->sortdisc))) goto bad; if (!(state->sort = rsnew(state->sortkey->disc))) goto bad; if (!(state->sortstack = stkopen(0))) goto bad; if (!(state->tmp = sfstropen())) goto bad; state->sortbase = stkptr(state->sortstack, 0); if (!(state->file = dssfopen(DSS(cx), "-", state->sortstack, DSS_FILE_WRITE, 0))) goto bad; sfprintf(cx->buf, "%s%s", strchr(dss_lib_sort.description, '['), sort_usage); s = sfstruse(cx->buf); uniq = 0; for (;;) { switch (optget(argv, s)) { case 0: break; case 'c': state->count = (size_t)opt_info.number; continue; case 'u': uniq = 1; continue; case '?': if (disc->errorf) { (*disc->errorf)(cx, disc, ERROR_USAGE|4, "%s", opt_info.arg); } else goto bad; continue; case ':': if (disc->errorf) (*disc->errorf)(cx, disc, 2, "%s", opt_info.arg); else goto bad; continue; default: opt[0] = opt_info.option[1]; opt[1] = 0; if (rskeyopt(state->sortkey, opt, 1)) goto bad; continue; } break; } if (error_info.errors > errors) goto bad; argv += opt_info.index; n = 0; num = state->sortkey->head && state->sortkey->head->rflag ? "nr" : "n"; while (s = *argv++) { if (t = strchr(s, '-')) { sfwrite(cx->buf, s, t - s); s = sfstruse(cx->buf); t++; } else t = 0; if (!(variable = cxvariable(cx, s, NiL, disc))) goto bad; if (rskey(state->sortkey, t ? t : cxisnumber(variable->type) ? num : "", 0)) goto bad; state->sortkey->tail->user = variable; n = 1; } if (uniq) { state->sortkey->type &= ~RS_DATA; state->sortkey->type |= RS_UNIQ; } if (state->count) { state->sortdisc.events |= RS_WRITE; state->sortdisc.eventf = count; if (!(state->uniqstack = stkopen(0))) goto bad; state->uniqbase = stkptr(state->uniqstack, 0); if (!(state->uniqkey = rskeyopen(&state->keydisc, &state->uniqdisc))) goto bad; if (!(state->uniq = rsnew(state->uniqkey->disc))) goto bad; if (rskey(state->uniqkey, "1n", 0)) goto bad; if (rskeyinit(state->uniqkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error"); goto bad; } if (rsinit(state->uniq, state->uniqkey->meth, state->uniqkey->procsize, state->uniqkey->type, state->uniqkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq initialization error"); goto bad; } } if (rskeyinit(state->sortkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error"); goto bad; } if (n) { state->sortdisc.defkeyf = key; state->sortdisc.key = 1; } else if (state->sortkey->head->rflag) { state->sortdisc.defkeyf = rev; state->sortdisc.key = 1; } if (rsinit(state->sort, state->sortkey->meth, state->sortkey->procsize, state->sortkey->type, state->sortkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort initialization error"); goto bad; } state->cx = cx; state->op = expr->op; expr->data = state; return 0; bad: if (state->sort) rsclose(state->sort); if (state->uniq) rsclose(state->uniq); if (state->sortkey) rskeyclose(state->sortkey); if (state->uniqkey) rskeyclose(state->uniqkey); if (state->file) dssfclose(state->file); else if (state->sortstack) stkclose(state->sortstack); if (state->uniqstack) stkclose(state->uniqstack); if (state->tmp) sfstrclose(state->tmp); vmclose(state->vm); return -1; } static int sort_act(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc) { State_t* state = (State_t*)expr->data; Dssrecord_t* record = (Dssrecord_t*)data; char* s; ssize_t n; if (dssfwrite(state->file, record)) return -1; n = stktell(state->file->io); s = stkfreeze(state->file->io, 0); state->data = data; if (rsprocess(state->sort, s, -n) <= 0) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort record process error"); return -1; } return 0; } static int sort_end(Cx_t* cx, Cxexpr_t* expr, void* data, Cxdisc_t* disc) { State_t* state = (State_t*)expr->data; int r; r = 0; if (rswrite(state->sort, expr->op, RS_OTEXT)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort output error"); r = -1; } while (rsdisc(state->sort, NiL, RS_POP)); if (rsclose(state->sort)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort close error"); r = -1; } if (rskeyclose(state->sortkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "sort key error"); r = -1; } if (state->uniq) { if (rswrite(state->uniq, expr->op, RS_OTEXT)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq output error"); r = -1; } while (rsdisc(state->uniq, NiL, RS_POP)); if (rsclose(state->uniq)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq close error"); r = -1; } if (rskeyclose(state->uniqkey)) { if (disc->errorf) (*disc->errorf)(cx, disc, ERROR_SYSTEM|2, "uniq key error"); r = -1; } stkclose(state->uniqstack); } dssfclose(state->file); sfstrclose(state->tmp); vmclose(state->vm); return r; } static Cxquery_t queries[] = { { "sort", "sort records to the standard output", CXH, sort_beg, 0, sort_act, sort_end }, {0} }; Dsslib_t dss_lib_sort = { "sort", "sort query" "[-1lms5P?\n@(#)$Id: dss sort query (AT&T Research) 2011-10-18 $\n]" USAGE_LICENSE, CXH, 0, 0, 0, 0, 0, 0, &queries[0] };