1 /***********************************************************************
2 * *
3 * This software is part of the ast package *
4 * Copyright (c) 1996-2013 AT&T Intellectual Property *
5 * and is licensed under the *
6 * Eclipse Public License, Version 1.0 *
7 * by AT&T Intellectual Property *
8 * *
9 * A copy of the License is available at *
10 * http://www.eclipse.org/org/documents/epl-v10.html *
11 * (with md5 checksum b35adb5213ca9657e911e9befb180842) *
12 * *
13 * Information and Software Systems Research *
14 * AT&T Research *
15 * Florham Park NJ *
16 * *
17 * Glenn Fowler <glenn.s.fowler@gmail.com> *
18 * Phong Vo <phongvo@gmail.com> *
19 * Doug McIlroy <doug@research.bell-labs.com> *
20 * *
21 ***********************************************************************/
22 #pragma prototyped
23
24 /*
25 * sort main
26 *
27 * algorithm and interface
28 *
29 * Glenn Fowler
30 * Phong Vo
31 * AT&T Research
32 *
33 * key coders
34 *
35 * Doug McIlroy
36 * Bell Laboratories
37 */
38
39 static const char usage[] =
40 "[-n?\n@(#)$Id: sort (AT&T Research) 2013-09-19 $\n]"
41 USAGE_LICENSE
42 "[+NAME?sort - sort and/or merge files]"
43 "[+DESCRIPTION?\bsort\b sorts lines of all the \afiles\a together and "
44 "writes the result on the standard output. The file name \b-\b means the "
45 "standard input. If no files are named, the standard input is sorted.]"
46 "[+?The default sort key is an entire line. Default ordering is "
47 "lexicographic by bytes in machine collating sequence. The ordering is "
48 "affected globally by the locale and/or the following options, one or "
49 "more of which may appear. See \brecsort\b(3) for details.]"
50 "[+?For backwards compatibility the \b-o\b option is allowed in any file "
51 "operand position when neither the \b-c\b nor the \b--\b options are "
52 "specified.]"
53 "[k:key?Restrict the sort key to a string beginning at \apos1\a and "
54 "ending at \apos2\a. \apos1\a and \apos2\a each have the form \am.n\a, "
55 "counting from 1, optionally followed by one or more of the flags "
56 "\bCMbdfginprZ\b; \bm\b counts fields from the beginning of the line and "
57 "\bn\b counts characters from the beginning of the field. If any flags "
58 "are present they override all the global ordering options for this key. "
59 "If \a.n\a is missing from \apos1\a, it is taken to be 1; if missing "
60 "from \apos2\a, it is taken to be the end of the field. If \apos2\a is "
61 "missing, it is taken to be end of line. The second form specifies a "
62 "fixed record length \areclen\a, and the last form specifies a fixed "
63 "field at byte position \aposition\a (counting from 1) of \alength\a "
64 "bytes. The obsolescent \breclen:fieldlen:offset\b (byte offset from 0) "
65 "is also accepted.]:[pos1[,pos2]]|.reclen|.position.length]]]]]"
66 "[K:oldkey?Specified in pairs: \b-K\b \apos1\a \b-K\b \apos2\a, where "
67 "positions count from 0.]# [pos]"
68 "[R:record|recfmt?Sets the record format to \aformat\a; newlines will be "
69 "treated as normal characters. The formats are:]:[format]"
70 "{"
71 "[+d[\aterminator\a]]?Variable length with record \aterminator\a "
72 "character, \b\\n\b by default.]"
73 "[+[f]]\areclen\a?Fixed record length \areclen\a.]"
74 "[+v[op...]]?Variable length. \bh4o0z2bi\b (4 byte IBM V format "
75 "descriptor) if \aop\a are omitted. \aop\a may be a combination "
76 "of:]"
77 "{"
78 "[+h\an\a?Header size is \an\a bytes (default 4).]"
79 "[+o\an\a?Size offset in header is \an\a bytes (default "
80 "0).]"
81 "[+z\an\a?Size length is \an\a bytes (default "
82 "min(\bh\b-\bo\b,2)).]"
83 "[+b?Size is big-endian (default).]"
84 "[+l?Size is little-endian (default \bb\b).]"
85 "[+i?Record length includes header (default).]"
86 "[+n?Record length does not include header (default "
87 "\bi\b).]"
88 "}"
89 "[+%?If the record format is not otherwise specified, and the "
90 "any input file name, from left to right, ends with "
91 "\b%\b\aformat\a or \b%\b\aformat\a\b.\b* then the record format "
92 "is set to \aformat\a. In addition, the \b-o\b path, if "
93 "specified and if it does not contain \b%\b and if it names a "
94 "regular file, is renamed to contain the input \b%\b\aformat\a.]"
95 "[+-?The first block of the first input file is sampled to check "
96 "for \bv\b variable length and \bf\b fixed length format "
97 "records. Not all formats are detected. \bsort\b exits with an "
98 "error diagnostic if the record format cannot be determined from "
99 "the sample.]"
100 "}"
101 "[b:ignorespace|ignore-leading-blanks?Ignore leading white space (spaces "
102 "and tabs) in field comparisons.]"
103 "[d:dictionary?`Phone directory' order: only letters, digits and white "
104 "space are significant in string comparisons.]"
105 "[E:codeset|convert?The field data codeset is \acodeset\a or the field "
106 "data must be converted from the \afrom\a codeset to the \ato\a codeset. "
107 "The codesets are:]:[codeset|from::to]"
108 "{\fcodesets\f}"
109 "[f:fold|ignorecase?Fold lower case letters onto upper case.]"
110 "[h:scaled|human-readable?Compare numbers scaled with IEEE 1541-2002 "
111 "suffixes.]"
112 "[i:ignorecontrol?Ignore characters outside the ASCII range 040-0176 in "
113 "string comparisons.]"
114 "[J:shuffle|jumble?Do a random shuffle of the sort keys. \aseed\a "
115 "specifies a pseudo random number generator seed. A \aseed\a of 0 "
116 "generates a seed based on time and pid.]#[seed]"
117 "[n:numeric?An initial numeric string, consisting of optional white "
118 "space, optional sign, and a nonempty string of digits with optional "
119 "decimal point, is sorted by value.]"
120 "[g:floating?Numeric, like \b-n\b, with \be\b-style exponents allowed.]"
121 "[p:bcd|packed-decimal?Compare packed decimal (bcd) numbers with "
122 "trailing sign.]"
123 "[M:months?Compare as month names. The first three characters after "
124 "optional white space are folded to lower case and compared. Invalid "
125 "fields compare low to \bjan\b.]"
126 "[r:reverse|invert?Reverse the sense of comparisons.]"
127 "[t:tabs?`Tab character' separating fields is \achar\a.]:[tab-char]"
128 "[c:check?Check that the single input file is sorted according to the "
129 "ordering rules; give no output on the standard output. If the input "
130 "is out of sort then write one diagnostic line on the standard error "
131 "and exit with code \b1\b.]"
132 "[C:silent-check?Like \b--check\b except no diagnostic is written.]"
133 "[j:processes|nproc|jobs?Use up to \ajobs\a separate processes to sort "
134 "the input. The current implementation still uses one process for the "
135 "final merge phase; improvements are planned.]#[processes]"
136 "[m:merge?Merge; the input files are already sorted.]"
137 "[u:unique?Unique. Keep only the first of multiple records that compare "
138 "equal on all keys. Implies \b-s\b.]"
139 "[s:stable?Stable sort. When all keys compare equal, preserve input "
140 "order. The default is \b--nostable\b (\aunstable\a sort): when all "
141 "keys compare equal, break the tie by using the entire record, ignoring "
142 "all but the \b-r\b option.]"
143 "[o:output?Place output in the designated \afile\a instead of on the "
144 "standard output. This file may be the same as one of the inputs. The "
145 "\afile\a \b-\b names the standard output. The option may appear among "
146 "the file arguments, except after \b--\b.]:[output]"
147 "[l:library?Load the external sort discipline \alibrary\a with optional "
148 "comma separated \aname=value\a arguments. Libraries are loaded, in left "
149 "to right order, after the sort method has been "
150 "initialized.]:[library[,name=value...]]]"
151 "[T:tempdir?Put temporary files in \atempdir\a.]:[tempdir:=/usr/tmp]"
152 "[L:list?List the available sort methods. See the \b-x\b option.]"
153 "[x:method?Specify the sort method to apply:]:[method:=rasp]"
154 "{\fmethods\f} [v:verbose?Trace the sort progress on the standard "
155 "error.]"
156 "[P:plugins?List plugin information for each \afile\a operand in "
157 "--\astyle\a on the standard error. If no \afile\a operands are "
158 "specified then the first instance of each \bsort\b plugin installed on "
159 "\b$PATH\b or a sibling dir on \b$PATH\b is listed. The special "
160 "\astyle\a \blist\b lists a line on the standard output for each plugin "
161 "with the name, a tab character, and plugin specific command line "
162 "options parameterized by \b${style}\b (suitable for \beval\b'ing in "
163 "\bsh\b(1).)]:[style:=list|man|html|nroff|usage]"
164 "[Z:zd|zoned-decimal?Compare zoned decimal (ZD) numbers with embedded "
165 "trailing sign.]"
166 "[z:size|zip?Suggest using the specified number of bytes of internal "
167 "store to tune performance. Power of 2 and power of 10 size suffixes are "
168 "accepted. Type is a single character and may be one of:]:[type[size]]]"
169 "{"
170 "[+a?Buffer alignment.]"
171 "[+b?Input reserve buffer size.]"
172 "[+c?Input chunk size; sort chunks of this size and disable "
173 "merge.]"
174 "[+i?Input buffer size.]"
175 "[+m?Maximum number of intermediate merge files.]"
176 "[+p?Input sort size; sort chunks of this size before merge.]"
177 "[+o?Output buffer size.]"
178 "[+r?Maximum record size.]"
179 "[+I?Decompress the input if it is compressed.]"
180 "[+O?\bgzip\b(1) compress the output.]"
181 "}"
182 "[X:test?Enables implementation defined test code. Some or all of these "
183 "may be disabled.]:[test]"
184 "{"
185 "[+dump?List detailed information on the option settings.]"
186 "[+io?List io file paths.]"
187 "[+keys?List the canonical key for each record.]"
188 "[+read?Force input file read by disabling memory mapping.]"
189 "[+show?Show setup information and exit before sorting.]"
190 "[+test?Immediatly exit with status 0; used to verify this "
191 "implementation]"
192 "}"
193 "[D:debug?Sets the debug trace level. Higher levels produce more "
194 "output.]# [level]"
195 "[S|y?Equivalent to \b-zp\b\asize\a; if \asize\a has no suffix then \bki\b "
196 "is assumed.]:[size]"
197
198 "\n"
199 "\n[ file ... ]\n"
200 "\n"
201
202 "[+?+\apos1\a -\apos2\a is the classical alternative to \b-k\b, with "
203 "counting from 0 instead of 1, and pos2 designating next-after-last "
204 "instead of last character of the key. A missing character count in "
205 "\apos2\a means 0, which in turn excludes any \b-t\b tab character from "
206 "the end of the key. Thus +1 -1.3 is the same as \b-k\b 2,2.3 and +1r -3 "
207 "is the same as \b-k\b 2r,3.]"
208 "[+?Under option \b-t\b\ax\a fields are strings separated by \ax\a; "
209 "otherwise fields are non-empty strings separated by white space. White "
210 "space before a field is part of the field, except under option \b-b\b. "
211 "A \bb\b flag may be attached independently to \apos1\a and \apos2\a.]"
212 "[+?When there are multiple sort keys, later keys are compared only "
213 "after all earlier keys compare equal. Except under option \b-s\b, lines "
214 "with all keys equal are ordered with all bytes significant. \b-S\b "
215 "turns off \b-s\b, the last occurrence, left-to-right, takes affect.]"
216 "[+?Sorting is done by a method determined by the \b-x\b option. \b-L\b "
217 "lists the available methods. rasp (radix+splay-tree) is the default and "
218 "current all-around best.]"
219 "[+?Single-letter options may be combined into a single string, such as "
220 "\b-cnrt:\b. The option combination \b-di\b and the combination of "
221 "\b-n\b with any of \b-diM\b are improper. Posix argument conventions "
222 "are supported.]"
223 "[+?Options \b-b\b, \b-c\b, \b-d\b, \b-f\b, \b-i\b, \b-k\b, \b-m\b, "
224 "\b-n\b, \b-o\b, \b-r\b, \b-t\b, and \b-u\b are in the Posix and/or "
225 "X/Open standards.]"
226
227 "[+DIAGNOSTICS?\asort\a comments and exits with non-zero status for "
228 "various trouble conditions and for disorder discovered under option "
229 "\b-c\b.]"
230 "[+SEE ALSO?\bcomm\b(1), \bjoin\b(1), \buniq\b(1), \brecsort\b(3)]"
231 "[+CAVEATS?The never-documented default \apos1\a=0 for cases such as "
232 "\bsort -1\b has been abolished. An input file overwritten by \b-o\b is "
233 "not replaced until the entire output file is generated in the same "
234 "directory as the input, at which point the input is renamed.]"
235 ;
236
237 #include <sfio_t.h>
238 #include <ast.h>
239 #include <error.h>
240 #include <debug.h>
241 #include <ctype.h>
242 #include <fs3d.h>
243 #include <ls.h>
244 #include <option.h>
245 #include <recsort.h>
246 #include <recfmt.h>
247 #include <sfdcgzip.h>
248 #include <vmalloc.h>
249 #include <wait.h>
250 #include <iconv.h>
251 #include <dlldefs.h>
252
253 #define INMIN (1024) /* min input buffer size */
254 #define INBRK (64*INMIN) /* default heap increment */
255 #define INMAX (1024*INBRK) /* max input buffer size */
256 #define INREC (16*INMIN) /* record begin chunk size */
257
258 #define TEST_dump 0x80000000 /* dump the state before sort */
259 #define TEST_io 0x40000000 /* dump io files */
260 #define TEST_keys 0x20000000 /* dump keys */
261 #define TEST_read 0x10000000 /* force sfread() */
262 #define TEST_show 0x08000000 /* show but don't do */
263 #define TEST_reserve 0x04000000 /* force sfreserve() */
264
265 #define pathstdin(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdin")||streq(s,"/dev/fd/0"))
266 #define pathstdout(s) (!(s)||streq(s,"-")||streq(s,"/dev/stdout")||streq(s,"/dev/fd/1"))
267
268 typedef struct Part_s
269 {
270 Sfdisc_t disc; /* sfio discipline */
271 off_t offset; /* file offset */
272 off_t size; /* total size at offset */
273 off_t remain; /* read size remaining */
274 } Part_t;
275
276 typedef struct Job_s
277 {
278 off_t offset; /* file part offset */
279 off_t size; /* file part size */
280 size_t chunk; /* file part chunk */
281 int intermediates; /* number of intermediate files */
282 } Job_t;
283
284 typedef struct Sort_s
285 {
286 Rskeydisc_t disc; /* rskey discipline */
287 Rs_t* rec; /* rsopen() context */
288 Rskey_t* key; /* rskeyopen() context */
289 Rsdefkey_f defkeyf; /* real defkeyf if TEST_keys */
290 Sfio_t* tp; /* TEST_keys tmp stream */
291 Sfio_t* op; /* output stream */
292 Job_t* jobs; /* multi-proc job table */
293 char* overwrite; /* -o input overwrite tmp file */
294 char* buf; /* input buffer */
295 Sfio_t* opened; /* fileopen() peek stream */
296 size_t cur; /* input buffer index */
297 size_t hit; /* input buffer index overflow */
298 size_t end; /* max input buffer index */
299 size_t bufsize; /* input reserve buffer size */
300 off_t total; /* total size of single file */
301 unsigned long test; /* test bit mask */
302 int child; /* in child process */
303 int chunk; /* chunk the input (no merge) */
304 int hadstdin; /* already has - on input */
305 int map; /* sfreserve() input */
306 int mfiles; /* multi-stage files[] count */
307 int nfiles; /* files[] count */
308 int xfiles; /* max files[] count */
309 int preserve; /* rename() tmp output to input */
310 int single; /* one input file */
311 int verbose; /* trace main actions */
312 int zip; /* sfdcgzip SF_* flags */
313 Sfio_t* files[OPEN_MAX > 68 ? 64 : (OPEN_MAX-4)];
314 } Sort_t;
315
316 /*
317 * optget() info discipline function
318 */
319
320 static int
optinfo(Opt_t * op,Sfio_t * sp,const char * s,Optdisc_t * dp)321 optinfo(Opt_t* op, Sfio_t* sp, const char* s, Optdisc_t* dp)
322 {
323 register iconv_list_t* ic;
324 register int n;
325
326 if (streq(s, "codesets"))
327 {
328 n = 0;
329 for (ic = iconv_list(NiL); ic; ic = iconv_list(ic))
330 if (ic->ccode >= 0)
331 n += sfprintf(sp, "[%c:%s?%s]", ic->match[ic->match[0] == '('], ic->name, ic->desc);
332 return n;
333 }
334 else if (streq(s, "methods"))
335 return rskeylist(NiL, sp, 1);
336 return 0;
337 }
338
339 /*
340 * handle RS_VERIFY event
341 */
342
343 static int
verify(Rs_t * rs,int op,Void_t * data,Void_t * arg,Rsdisc_t * disc)344 verify(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
345 {
346 if (op == RS_VERIFY)
347 error(3, "disorder at record %lld", (Sflong_t)((Rsobj_t*)data)->order);
348 return 0;
349 }
350
351 static int
verify_silent(Rs_t * rs,int op,Void_t * data,Void_t * arg,Rsdisc_t * disc)352 verify_silent(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
353 {
354 if (op == RS_VERIFY)
355 exit(1);
356 return 0;
357 }
358
359 /*
360 * return read stream for path
361 */
362
363 static Sfio_t*
fileopen(register Sort_t * sp,const char * path)364 fileopen(register Sort_t* sp, const char* path)
365 {
366 Sfio_t* fp;
367
368 if (fp = sp->opened)
369 sp->opened = 0;
370 else
371 {
372 if (pathstdin(path))
373 {
374 if (sp->hadstdin)
375 error(3, "%s: can only read once", path);
376 sp->hadstdin = 1;
377 fp = sfstdin;
378 }
379 else if (!(fp = sfopen(NiL, path, "r")))
380 error(ERROR_SYSTEM|3, "%s: cannot open", path);
381 if (rsfileread(sp->rec, fp, path))
382 {
383 if (fp != sfstdin)
384 sfclose(fp);
385 return 0;
386 }
387 sfset(fp, SF_SHARE, 0);
388 if (sp->zip & SF_READ)
389 sfdcgzip(fp, 0);
390 }
391 return fp;
392 }
393
394 /*
395 * prevent ERROR_USAGE|4 messages from exiting
396 */
397
398 static void
noexit(int code)399 noexit(int code)
400 {
401 }
402
403 /*
404 * list info for one plugin on the standard error
405 */
406
407 static void
showlib(Sort_t * sp,Rskey_t * kp,const char * name,const char * style)408 showlib(Sort_t* sp, Rskey_t* kp, const char* name, const char* style)
409 {
410 char* args;
411 char buf[128];
412
413 if (style)
414 sfsprintf(args = buf, sizeof(buf), "%s,%s", name, style);
415 else
416 args = (char*)name;
417 if (!rslib(sp->rec, kp, args, RS_IGNORE) && !style)
418 sfprintf(sfstdout, "%s\t--library=%s,${style}\n", name, name);
419 }
420
421 /*
422 * list info for all [selected] plugins on the standard error
423 */
424
425 static int
showplugins(Sort_t * sp,Rskey_t * kp,const char * style)426 showplugins(Sort_t* sp, Rskey_t* kp, const char* style)
427 {
428 Dllscan_t* dls;
429 Dllent_t* dle;
430 char* name;
431 void (*oexit)(int);
432
433 if (streq(style, "list"))
434 style = 0;
435 else
436 {
437 oexit = error_info.exit;
438 error_info.exit = noexit;
439 }
440 if (*kp->input)
441 while (name = *kp->input++)
442 showlib(sp, kp, name, style);
443 else if (dls = dllsopen("sort", NiL, NiL))
444 {
445 while (dle = dllsread(dls))
446 showlib(sp, kp, dle->name, style);
447 dllsclose(dls);
448 }
449 if (style)
450 error_info.exit = oexit;
451 return 0;
452 }
453
454 struct Lib_s; typedef struct Lib_s Lib_t;
455
456 struct Lib_s
457 {
458 Lib_t* next;
459 char* name;
460 };
461
462 /*
463 * process argv as in sort(1)
464 */
465
466 static int
parse(register Sort_t * sp,char ** argv)467 parse(register Sort_t* sp, char** argv)
468 {
469 register Rskey_t* key = sp->key;
470 register int n;
471 register char* s;
472 char* e;
473 char* p;
474 char** a;
475 char** v;
476 size_t z;
477 int i;
478 int map;
479 char* plugins = 0;
480 Lib_t* firstlib = 0;
481 Lib_t* lastlib = 0;
482 Lib_t* lib;
483 Recfmt_t r;
484 Mbstate_t q;
485 int obsolescent = 1;
486 char opt[64];
487 Optdisc_t optdisc;
488 struct stat st;
489
490 optinit(&optdisc, optinfo);
491 for (;;)
492 {
493 switch (optget(argv, usage))
494 {
495 case 0:
496 break;
497 case 'c':
498 case 'C':
499 obsolescent = 0;
500 key->meth = Rsverify;
501 key->disc->events = RS_VERIFY;
502 key->disc->eventf = opt_info.option[1] == 'C' ? verify_silent : verify;
503 continue;
504 case 'E':
505 case 'J':
506 sfsprintf(opt, sizeof(opt), "%c%s", opt_info.option[1], opt_info.arg);
507 if (rskeyopt(key, opt, 1))
508 return 0;
509 continue;
510 case 'j':
511 key->nproc = opt_info.num;
512 continue;
513 case 'k':
514 if (rskey(key, opt_info.arg, 0))
515 return -1;
516 continue;
517 case 'l':
518 if (!(lib = newof(0, Lib_t, 1, 0)))
519 error(ERROR_SYSTEM|3, "out of space");
520 lib->name = opt_info.arg;
521 if (lastlib)
522 lastlib = lastlib->next = lib;
523 else
524 firstlib = lastlib = lib;
525 continue;
526 case 'm':
527 key->merge = !!opt_info.num;
528 continue;
529 case 'o':
530 key->output = opt_info.arg;
531 continue;
532 case 's':
533 if (opt_info.num)
534 key->type &= ~RS_DATA;
535 else
536 key->type |= RS_DATA;
537 continue;
538 case 't':
539 if (key->tab[0])
540 error(1, "%s: %s conflicts with %s", opt_info.option, *opt_info.arg, key->tab);
541 mbtinit(&q);
542 if ((n = mbtsize(opt_info.arg, MB_LEN_MAX, &q)) < 1)
543 {
544 error(1, "%s: %s: invalid tab character", opt_info.option, opt_info.arg);
545 n = 0;
546 }
547 if (*(opt_info.arg + n) || n >= sizeof(key->tab))
548 error(1, "%s: %s: single character expected", opt_info.option, opt_info.arg);
549 memcpy(key->tab, opt_info.arg, n);
550 key->tab[n] = 0;
551 continue;
552 case 'u':
553 key->type &= ~RS_DATA;
554 key->type |= RS_UNIQ;
555 continue;
556 case 'v':
557 key->verbose = !!opt_info.num;
558 sp->verbose = key->verbose || (key->test & TEST_show);
559 continue;
560 case 'x':
561 if (!(key->meth = rskeymeth(key, opt_info.arg)))
562 error(2, "%s: unknown method", opt_info.arg);
563 continue;
564 case 'z':
565 if (isalpha(n = *(s = opt_info.arg)))
566 s++;
567 else
568 n = 'r';
569 size:
570 z = strton(s, &e, NiL, 1);
571 if (*e == '%')
572 {
573 error(2, "%s %c%s: %% not supported -- do you really want that much memory?", opt_info.option, n, s);
574 return -1;
575 }
576 if (*e || z < ((n == 'm' || n == 'o' || n == 'r' || isupper(n)) ? 0 : 512))
577 {
578 error(2, "%s %c%s: invalid size", opt_info.option, n, s);
579 return -1;
580 }
581 switch (n)
582 {
583 case 'a':
584 key->alignsize = z;
585 break;
586 case 'b':
587 sp->bufsize = z;
588 break;
589 case 'c':
590 sp->chunk = 1;
591 key->alignsize = key->insize = z;
592 break;
593 case 'i':
594 key->insize = z;
595 break;
596 case 'm':
597 if (z <= 0 || z > elementsof(sp->files))
598 z = elementsof(sp->files);
599 sp->xfiles = z;
600 break;
601 case 'p':
602 key->procsize = z;
603 break;
604 case 'o':
605 key->outsize = z;
606 break;
607 case 'r':
608 key->recsize = z;
609 break;
610 case 'I':
611 sp->zip |= SF_READ;
612 break;
613 case 'O':
614 sp->zip |= SF_WRITE;
615 break;
616 }
617 continue;
618 case 'D':
619 error_info.trace = -opt_info.num;
620 continue;
621 case 'K':
622 if (opt_info.offset)
623 {
624 opt_info.offset = 0;
625 opt_info.index++;
626 }
627 if (rskey(key, opt_info.arg, *opt_info.option))
628 return -1;
629 continue;
630 case 'L':
631 rskeylist(key, sfstdout, 0);
632 exit(0);
633 case 'P':
634 plugins = opt_info.arg;
635 continue;
636 case 'R':
637 key->disc->data = recstr(opt_info.arg, &e);
638 if (*e)
639 {
640 error(2, "%s: invalid record format", opt_info.arg);
641 return -1;
642 }
643 continue;
644 case 'S':
645 case 'y':
646 n = 'p';
647 s = opt_info.arg;
648 if (*s && *(e = s + strlen(s) - 1) != '%' && !isalpha(*e))
649 {
650 sfsprintf(opt, sizeof(opt), "%ski", s);
651 s = opt;
652 }
653 goto size;
654 case 'T':
655 pathtemp(NiL, 0, opt_info.arg, "/TMPPATH", NiL);
656 continue;
657 case 'X':
658 s = opt_info.arg;
659 opt_info.num = strton(s, &e, NiL, 1);
660 if (*e)
661 {
662 if (streq(s, "dump"))
663 opt_info.num = TEST_dump;
664 else if (streq(s, "io"))
665 opt_info.num = TEST_io;
666 else if (streq(s, "keys"))
667 opt_info.num = TEST_keys;
668 else if (streq(s, "read"))
669 opt_info.num = TEST_read;
670 else if (streq(s, "reserve"))
671 opt_info.num = TEST_reserve;
672 else if (streq(s, "show"))
673 opt_info.num = TEST_show;
674 else if (streq(s, "test"))
675 {
676 sfprintf(sfstdout, "ok\n");
677 exit(0);
678 }
679 else
680 error(1, "%s: unknown test", s);
681 }
682 if (*opt_info.option == '+')
683 key->test &= ~opt_info.num;
684 else
685 key->test |= opt_info.num;
686 sp->test = key->test;
687 sp->verbose = key->verbose || (key->test & TEST_show);
688 continue;
689 case '?':
690 error(ERROR_USAGE|4, "%s", opt_info.arg);
691 return 1;
692 case ':':
693 error(2, "%s", opt_info.arg);
694 return -1;
695 default:
696 opt[0] = opt_info.option[1];
697 opt[1] = 0;
698 if (rskeyopt(key, opt, 1))
699 return 0;
700 continue;
701 }
702 break;
703 }
704 argv += opt_info.index;
705 if (obsolescent && (opt_info.index <= 1 || !streq(*(argv - 1), "--")))
706 {
707 /*
708 * check for obsolescent `-o output' after first file operand
709 */
710
711 a = v = argv;
712 while (s = *a++)
713 {
714 if (*s == '-' && *(s + 1) == 'o')
715 {
716 if (!*(s += 2) && !(s = *a++))
717 {
718 error(2, "-o: output argument expected");
719 break;
720 }
721 key->output = s;
722 }
723 else
724 *v++ = s;
725 }
726 *v = 0;
727 }
728 key->input = argv;
729
730 /*
731 * disciplines have the opportunity to modify key info
732 */
733
734 while (lib = firstlib)
735 {
736 if (rslib(sp->rec, key, lib->name, 0))
737 return 1;
738 firstlib = firstlib->next;
739 free(lib);
740 }
741
742 /*
743 * plugins list bails early
744 */
745
746 if (plugins)
747 exit(showplugins(sp, key, plugins));
748
749 /*
750 * record format chicanery
751 */
752
753 if (map = RECTYPE(key->disc->data) == REC_method && REC_M_INDEX(key->disc->data) == REC_M_path)
754 for (n = 0, i = -1; p = key->input[n]; n++)
755 if (s = strrchr(p, '%'))
756 {
757 r = recstr(s + 1, &e);
758 if (!*e || *e == '.' && e > (s + 1))
759 {
760 if (r != key->disc->data && i >= 0 && (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_ATTRIBUTES(r) != REC_V_ATTRIBUTES(key->disc->data)))
761 {
762 error(2, "%s: format %s incompatible with %s format %s", p, fmtrec(r, 0), key->input[i], fmtrec(key->disc->data, 0));
763 return 1;
764 }
765 if (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_SIZE(key->disc->data) < REC_V_SIZE(r))
766 key->disc->data = r;
767 i = n;
768 }
769 }
770 if (RECTYPE(key->disc->data) == REC_method && ((n = REC_M_INDEX(key->disc->data)) == REC_M_path || n == REC_M_data))
771 {
772 if ((sp->opened = fileopen(sp, key->input[0])) && (s = sfreserve(sp->opened, SF_UNBOUND, SF_LOCKR)))
773 {
774 struct stat st;
775
776 z = sfvalue(sp->opened);
777 if (fstat(sffileno(sp->opened), &st) || st.st_size < z)
778 st.st_size = 0;
779 key->disc->data = recfmt(s, z, st.st_size);
780 sfread(sp->opened, s, 0);
781 }
782 else
783 {
784 z = sp->opened ? sfvalue(sp->opened) : -1;
785 key->disc->data = REC_N_TYPE();
786 }
787 if (z && key->disc->data == REC_N_TYPE())
788 error(3, "%s: record format cannot be determined from data sample", key->input[0]);
789 }
790 if (RECTYPE(key->disc->data) == REC_fixed)
791 key->fixed = REC_F_SIZE(key->disc->data);
792 if (map && key->output && key->disc->data != REC_N_TYPE() && (stat(key->output, &st) || S_ISREG(st.st_mode)))
793 {
794 if (p = strrchr(key->output, '/'))
795 s = p + 1;
796 else
797 s = key->output;
798 if (!strchr(s, '%'))
799 {
800 p = key->output;
801 if (!(e = strrchr(s, '.')))
802 e = s + strlen(s);
803 if (RECTYPE(key->disc->data) == REC_variable && !REC_V_SIZE(key->disc->data))
804 key->disc->data |= ((1<<15)-1);
805 if (!(s = strdup(sfprints("%-*.*s%%%s%s", e - p, e - p, p, fmtrec(key->disc->data, 1), e))))
806 error(ERROR_SYSTEM|3, "out of space");
807 key->output = s;
808 if (sp->verbose)
809 error(0, "%s rename output %s => %s", error_info.id, p, s);
810 }
811 }
812 if (sp->verbose)
813 error(0, "%s %s record format", error_info.id, fmtrec(key->disc->data, 0));
814 return error_info.errors != 0;
815 }
816
817 /*
818 * dump keys to stderr
819 */
820
821 static ssize_t
dumpkey(Rs_t * rs,unsigned char * dat,size_t datlen,unsigned char * key,size_t keylen,Rsdisc_t * disc)822 dumpkey(Rs_t* rs, unsigned char* dat, size_t datlen, unsigned char* key, size_t keylen, Rsdisc_t* disc)
823 {
824 Sort_t* sp = (Sort_t*)RSKEYDISC(disc);
825 ssize_t n;
826 int i;
827 char buf[2];
828
829 if ((n = (*sp->defkeyf)(rs, dat, datlen, key, keylen, disc)) > 0)
830 {
831 buf[1] = 0;
832 for (i = 0; i < n; i++)
833 {
834 buf[0] = key[i];
835 sfputr(sp->tp, fmtesc(buf), -1);
836 }
837 sfprintf(sfstderr, "key: %s\n", sfstruse(sp->tp));
838 }
839 return n;
840 }
841
842 /*
843 * initialize sp from argv
844 */
845
846 static int
init(register Sort_t * sp,Rskeydisc_t * dp,char ** argv)847 init(register Sort_t* sp, Rskeydisc_t* dp, char** argv)
848 {
849 register Rskey_t* key;
850 register char* s;
851 register char** p;
852 char* t;
853 int n;
854 unsigned long x;
855 unsigned long z;
856 size_t fixed;
857 struct stat is;
858 struct stat os;
859
860 memset(sp, 0, sizeof(*sp));
861 sp->xfiles = elementsof(sp->files);
862 sfset(sfstdout, SF_SHARE, 0);
863 sfset(sfstderr, SF_SHARE, 0);
864 Vmdcsbrk->round = INBRK;
865 dp->version = RSKEY_VERSION;
866 dp->flags = 0;
867 dp->errorf = errorf;
868 if (!(sp->key = key = rskeyopen(dp, NiL)) || !(sp->rec = rsnew(key->disc)))
869 return -1;
870 z = key->insize = 2 * INMAX;
871 #if 0
872 if (conformance(0, 0))
873 #endif
874 key->type |= RS_DATA;
875 if ((n = strtol(astconf("PAGESIZE", NiL, NiL), &t, 0)) > 0 && !*t)
876 key->alignsize = n;
877 if ((n = parse(sp, argv)) || rskeyinit(key))
878 {
879 rskeyclose(key);
880 if (n < 0)
881 error(ERROR_USAGE|4, "%s", optusage(NiL));
882 return -1;
883 }
884
885 /*
886 * finalize the buffer dimensions
887 */
888
889 if ((x = key->insize) != z)
890 z = 0;
891 if (x > INMAX)
892 x = INMAX;
893 else if (x < INMIN && !sp->chunk)
894 x = INMIN;
895 if (sp->single = !key->input[1])
896 {
897 if (!(sp->opened = fileopen(sp, key->input[0])))
898 error(ERROR_SYSTEM|3, "%s: cannot open", key->input[0]);
899 if (fstat(sffileno(sp->opened), &is))
900 error(ERROR_SYSTEM|3, "%s: cannot stat", key->input[0]);
901 if (!S_ISREG(is.st_mode) || sp->opened->disc) /* XXX: need sfio call to test if any disc pushed */
902 {
903 sp->total = 0;
904 sp->test |= TEST_read;
905 }
906 else if (x > (sp->total = is.st_size))
907 x = sp->total;
908 }
909 else
910 sp->test |= TEST_read;
911 if (sp->zip & SF_READ)
912 sp->test |= TEST_read;
913 fixed = key->fixed;
914 if ((sp->test & TEST_reserve) || !(sp->test & TEST_read))
915 {
916 sp->map = 1;
917 if (z)
918 x = key->procsize;
919 if (fixed)
920 x += fixed - x % fixed;
921 }
922 else
923 {
924 if (z)
925 x = key->procsize;
926 for (;;)
927 {
928 if (fixed)
929 x += fixed - x % fixed;
930 if (sp->buf = (char*)vmalign(Vmheap, x, key->alignsize))
931 break;
932 if ((x >>= 1) < INMIN)
933 error(ERROR_SYSTEM|3, "out of space");
934 }
935 sp->hit = x - key->alignsize;
936 }
937 if (sp->test & TEST_keys)
938 {
939 if (!key->disc->defkeyf)
940 error(2, "no key function to intercept");
941 else if (!(sp->tp = sfstropen()))
942 error(ERROR_SYSTEM|3, "out of space");
943 else
944 {
945 sp->defkeyf = key->disc->defkeyf;
946 key->disc->defkeyf = dumpkey;
947 }
948 }
949 if (key->nproc > 1)
950 {
951 off_t offset;
952 off_t total;
953 off_t size;
954 size_t chunk;
955 int i;
956 Job_t* jp;
957
958 if (!sp->map || pathstdin(key->input[0]))
959 {
960 uno:
961 key->nproc = 1;
962 }
963 else if ((n = (sp->total + key->procsize - 1) / (key->procsize)) <= 1)
964 goto uno;
965 else
966 {
967 if (n < key->nproc)
968 key->nproc = n;
969 else
970 n = key->nproc;
971 if (!(sp->jobs = vmnewof(Vmheap, 0, Job_t, n, 0)))
972 goto uno;
973 size = (sp->total + n - 1) / n;
974 if (fixed)
975 {
976 if (size % fixed)
977 size += fixed - size % fixed;
978 i = (size + x - 1) / x;
979 if (i * n > sp->xfiles)
980 {
981 error(1, "multi-process multi-stage not implemented; falling back to one processor");
982 goto uno;
983 }
984 chunk = size / i;
985 if (chunk % fixed)
986 chunk += fixed - chunk % fixed;
987 size = chunk * i;
988 offset = 0;
989 total = sp->total;
990 for (jp = sp->jobs; jp < sp->jobs + n; jp++)
991 {
992 jp->offset = offset;
993 if (size > total)
994 size = total;
995 total -= (jp->size = size);
996 jp->chunk = chunk;
997 jp->intermediates = i;
998 offset += size;
999 }
1000 if (key->procsize > chunk)
1001 key->procsize = chunk;
1002 else
1003 {
1004 size = key->procsize;
1005 i = (chunk + size - 1) / size;
1006 size = chunk / i;
1007 if (size % fixed)
1008 size += fixed - size % fixed;
1009 key->procsize = size;
1010 }
1011 }
1012 else
1013 {
1014 register char* s;
1015 register char* t;
1016 register char* b;
1017 off_t ideal;
1018 off_t scan;
1019 char* file;
1020 Sfio_t* ip;
1021
1022 i = (size + x - 1) / x;
1023 if (i * n > sp->xfiles)
1024 {
1025 error(1, "multi-process multi-stage not implemented; falling back to one processor");
1026 goto uno;
1027 }
1028 chunk = (size + i - 1) / i;
1029 size = ideal = chunk * i;
1030 if ((scan = INREC) >= ideal)
1031 scan = (ideal / 32) * 4;
1032 offset = 0;
1033 total = sp->total;
1034 file = key->input[0];
1035 if (!(ip = fileopen(sp, file)))
1036 error(ERROR_SYSTEM|3, "%s: cannot read", file);
1037 for (jp = sp->jobs; jp < sp->jobs + n; jp++)
1038 {
1039 jp->offset = offset;
1040 if (((size = ideal) + scan) >= total)
1041 size = total;
1042 else
1043 {
1044 /*UNDENT...*/
1045
1046 /*
1047 * snoop around for the closest record boundary
1048 */
1049
1050 size -= scan / 2;
1051 if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1052 error(ERROR_SYSTEM|3, "%s: record boundary seek error at offset %lld", file, (Sflong_t)offset + size);
1053 if (!(b = (char*)sfreserve(ip, scan, 0)))
1054 error(ERROR_SYSTEM|3, "%s: record boundary read error at offset %lld", file, (Sflong_t)offset + size);
1055 s = t = b + scan / 2 - 1;
1056 while (*s++ != '\n')
1057 {
1058 if (t < b)
1059 {
1060 bigger:
1061 if (((size += scan) + offset) >= (total - scan))
1062 error(3, "%s: monster record at offset %lld", (Sflong_t)offset);
1063 if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1064 error(ERROR_SYSTEM|3, "%s: record boundary input seek error at %lld", file, (Sflong_t)offset + size);
1065 if (!(b = (char*)sfreserve(ip, scan, 0)))
1066 error(ERROR_SYSTEM|3, "%s: record boundary read error at %lld", file, (Sflong_t)offset + size);
1067 t = (s = b) + scan;
1068 do
1069 {
1070 if (s >= t)
1071 goto bigger;
1072 } while (*s++ != '\n');
1073 break;
1074 }
1075 if (*t-- == '\n')
1076 {
1077 s = t + 2;
1078 break;
1079 }
1080 }
1081 size += (s - b);
1082
1083 /*...INDENT*/
1084 }
1085 total -= (jp->size = size);
1086 jp->chunk = (size + i - 1) / i;
1087 if (jp->chunk > chunk)
1088 chunk = jp->chunk;
1089 jp->intermediates = i;
1090 offset += size;
1091 }
1092 if (rsfileclose(sp->rec, ip))
1093 return -1;
1094 sfclose(ip);
1095 key->procsize = (key->procsize > chunk) ? chunk : chunk / ((chunk + key->procsize - 1) / key->procsize);
1096 }
1097 }
1098 }
1099 key->insize = sp->end = x;
1100
1101 /*
1102 * check the output file for clash with the input files
1103 */
1104
1105 n = stat("/dev/null", &is);
1106 if (pathstdout(key->output))
1107 {
1108 key->output = "/dev/stdout";
1109 sp->op = sfstdout;
1110 if (!n && !fstat(sffileno(sp->op), &os) && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1111 key->type |= RS_IGNORE;
1112 }
1113 else if (key->input)
1114 {
1115 if (!stat(key->output, &os))
1116 {
1117 if (!n && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1118 key->type |= RS_IGNORE;
1119 else if (eaccess(key->output, W_OK))
1120 error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1121 else if (!fs3d(FS3D_TEST) || !iview(&os))
1122 {
1123 p = key->input;
1124 while (s = *p++)
1125 if (!pathstdin(s))
1126 {
1127 if (stat(s, &is))
1128 error(ERROR_SYSTEM|2, "%s: not found", s);
1129 else if (os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1130 {
1131 if (t = strrchr(key->output, '/'))
1132 {
1133 s = key->output;
1134 *t = 0;
1135 }
1136 else
1137 s = ".";
1138 if (sp->overwrite = pathtemp(NiL, 0, s, error_info.id, &n))
1139 sp->op = sfnew(NiL, NiL, SF_UNBOUND, n, SF_WRITE);
1140 if (t)
1141 *t = '/';
1142 if (!sp->op || fstat(n, &is))
1143 error(ERROR_SYSTEM|3, "%s: cannot create overwrite file %s", key->output, sp->overwrite);
1144 if (os.st_uid != is.st_uid || os.st_gid != is.st_gid)
1145 sp->preserve = 1;
1146 break;
1147 }
1148 }
1149 }
1150 }
1151 if (!sp->overwrite && !(sp->op = sfopen(NiL, key->output, "w")))
1152 error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1153 }
1154 if (rsfilewrite(sp->rec, sp->op, key->output))
1155 return -1;
1156 if (key->outsize > 0)
1157 sfsetbuf(sp->op, NiL, key->outsize);
1158 if (sp->zip & SF_WRITE)
1159 sfdcgzip(sp->op, 0);
1160
1161 /*
1162 * finally ready for recsort now
1163 */
1164
1165 if (rsinit(sp->rec, key->meth, key->procsize, key->type, key))
1166 {
1167 error(ERROR_SYSTEM|2, "sort library initialization error");
1168 rskeyclose(key);
1169 return -1;
1170 }
1171 if (sp->rec->meth->type == RS_MTCOPY)
1172 sp->chunk = 1;
1173 if (sp->test & TEST_io)
1174 {
1175 for (n = 0; s = key->input[n]; n++)
1176 error(0, "%s input[%d]\t\"%s\"", error_info.id, n, s);
1177 if (s = key->output)
1178 error(0, "%s output\t\"%s\"", error_info.id, s);
1179 }
1180 return 0;
1181 }
1182
1183 /*
1184 * close sp->files and push fp if not 0
1185 */
1186
1187 static void
clear(register Sort_t * sp,Sfio_t * fp)1188 clear(register Sort_t* sp, Sfio_t* fp)
1189 {
1190 register int i;
1191
1192 for (i = fp ? sp->mfiles : 0; i < sp->nfiles; i++)
1193 {
1194 rstempclose(sp->rec, sp->files[i]);
1195 sp->files[i] = 0;
1196 }
1197 if (fp)
1198 {
1199 sp->files[sp->mfiles++] = fp;
1200 sp->nfiles = sp->mfiles;
1201 if (sp->mfiles >= (sp->xfiles - 1))
1202 sp->mfiles = 0;
1203 }
1204 else
1205 sp->nfiles = sp->mfiles = 0;
1206 }
1207
1208 /*
1209 * flush the intermediate data
1210 * r is the partial record offset
1211 * updated r is returned
1212 */
1213
1214 static ssize_t
flush(register Sort_t * sp,register size_t r)1215 flush(register Sort_t* sp, register size_t r)
1216 {
1217 register Sfio_t* fp;
1218 register size_t n;
1219 register size_t m;
1220 register size_t b;
1221
1222 if (sp->chunk)
1223 {
1224 /*
1225 * skip merge and output sorted chunk
1226 */
1227
1228 if (rswrite(sp->rec, sp->op, RS_OTEXT))
1229 {
1230 if (!error_info.errors)
1231 error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1232 return -1;
1233 }
1234 }
1235 else if (sp->rec->meth->type != RS_MTVERIFY)
1236 {
1237 /*
1238 * write to an intermediate file and rewind for rsmerge
1239 */
1240
1241 if (!(fp = sp->files[sp->nfiles]))
1242 {
1243 if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1244 error(ERROR_SYSTEM|3, "cannot create intermediate sort file %d", sp->nfiles);
1245 sp->files[sp->nfiles] = fp;
1246 }
1247 sp->nfiles++;
1248 if (sp->verbose)
1249 error(0, "%s write intermediate", error_info.id);
1250 if (rswrite(sp->rec, fp, 0))
1251 {
1252 error(ERROR_SYSTEM|2, "intermediate sort file write error");
1253 return -1;
1254 }
1255 if (rstempread(sp->rec, fp))
1256 {
1257 error(ERROR_SYSTEM|2, "intermediate sort file rewind error");
1258 return -1;
1259 }
1260
1261 /*
1262 * multi-stage merge when open file limit exceeded
1263 */
1264
1265 if (sp->nfiles >= sp->xfiles)
1266 {
1267 if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1268 error(ERROR_SYSTEM|3, "cannot create intermediate merge file");
1269 if (sp->verbose)
1270 error(0, "%s merge multi-stage intermediate", error_info.id);
1271 if (rsmerge(sp->rec, fp, sp->files + sp->mfiles, sp->nfiles - sp->mfiles, 0))
1272 {
1273 error(ERROR_SYSTEM|2, "intermediate merge file write error");
1274 return -1;
1275 }
1276 if (rstempread(sp->rec, fp))
1277 {
1278 error(ERROR_SYSTEM|2, "intermediate merge file rewind error");
1279 return -1;
1280 }
1281 clear(sp, fp);
1282 }
1283 }
1284
1285 /*
1286 * slide over partial record data so the next read is aligned
1287 */
1288
1289 if (!sp->map && (m = sp->cur - r))
1290 {
1291 n = roundof(m, sp->key->alignsize) - m;
1292 if (n < r)
1293 {
1294 m = n;
1295 while (r < sp->cur)
1296 sp->buf[n++] = sp->buf[r++];
1297 sp->cur = n;
1298 }
1299 else
1300 {
1301 b = r;
1302 r += m;
1303 n += m;
1304 sp->cur = n;
1305 while (r > b)
1306 sp->buf[--n] = sp->buf[--r];
1307 m = n;
1308 }
1309 }
1310 else
1311 m = sp->cur = 0;
1312 return m;
1313 }
1314
1315 /*
1316 * input the records for file ip
1317 */
1318
1319 static int
input(register Sort_t * sp,Sfio_t * ip,const char * name,int last)1320 input(register Sort_t* sp, Sfio_t* ip, const char* name, int last)
1321 {
1322 register ssize_t n;
1323 register ssize_t p;
1324 register ssize_t m;
1325 register ssize_t r;
1326 size_t z;
1327 Sfoff_t w;
1328 char* b;
1329 int c;
1330 char del[2];
1331
1332 /*
1333 * align the read buffer and
1334 * loop on insize chunks
1335 */
1336
1337 error_info.file = ip == sfstdin ? (char*)0 : (char*)name;
1338 m = -1;
1339 z = 0;
1340 if (sp->bufsize)
1341 sfsetbuf(ip, NiL, sp->bufsize);
1342 else if (sfsize(ip) > SF_BUFSIZE)
1343 {
1344 if (sp->map)
1345 sfsetbuf(ip, NiL, z = sp->end);
1346 else
1347 sfsetbuf(ip, NiL, 0);
1348 }
1349 if (sp->map)
1350 w = sfsize(ip);
1351 r = sp->cur = roundof(sp->cur, sp->key->alignsize);
1352 p = 0;
1353 for (;;)
1354 {
1355 if (sp->cur > sp->hit)
1356 {
1357 if (sp->single && !sp->nfiles && sp->total == (sp->map ? 0 : p))
1358 break;
1359 if ((r = flush(sp, r)) < 0)
1360 return -1;
1361 }
1362 if (!sp->map)
1363 {
1364 n = sfeof(ip) ? 0 : sfread(ip, sp->buf + sp->cur, sp->end - sp->cur);
1365 if (last)
1366 {
1367 if ((c = sfgetc(ip)) == EOF)
1368 sp->rec->type |= RS_LAST;
1369 else
1370 sfungetc(ip, c);
1371 }
1372 }
1373 else
1374 {
1375 sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1376 n = sfvalue(ip);
1377 if (!sp->buf)
1378 {
1379 if (m < 0 && n < -m && z == sp->end)
1380 {
1381 sfsetbuf(ip, NiL, z = 2 * sp->end);
1382 sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1383 n = sfvalue(ip);
1384 if (sp->verbose && n)
1385 error(0, "%s buffer boundary expand to %I*d", error_info.id, sizeof(n), n);
1386 }
1387 if (!sp->buf && n > 0 && !(sp->buf = sfreserve(ip, n, SF_LOCKR)))
1388 n = -1;
1389 }
1390 if (last && w == (sftell(ip) + n))
1391 sp->rec->type |= RS_LAST;
1392 if (sp->verbose)
1393 error(0, "%s reserve %*d => %*d", error_info.id, sizeof(m), m, sizeof(n), n);
1394 }
1395 if (n <= 0)
1396 {
1397 if (n < 0)
1398 error(ERROR_SYSTEM|3, "read error");
1399 if (sp->cur <= r)
1400 break;
1401 if (sp->key->fixed)
1402 {
1403 error(1, "incomplete record length=%lld", (Sflong_t)(sp->cur - r));
1404 break;
1405 }
1406 if (RECTYPE(sp->key->disc->data) == REC_delimited && sp->buf[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1407 {
1408 sp->buf[sp->cur++] = c;
1409 if (c == '\n')
1410 error(1, "newline appended");
1411 else
1412 {
1413 del[0] = c;
1414 del[1] = 0;
1415 error(1, "%s appended", fmtquote(del, "'", NiL, 1, 0));
1416 }
1417 }
1418 }
1419 sp->cur += n;
1420 process:
1421 if (sp->verbose && !sp->child)
1422 error(ERROR_PROMPT, "%s process %lld ->", error_info.id, (Sflong_t)(sp->cur - r));
1423 if ((p = rsprocess(sp->rec, sp->buf + r, sp->cur - r)) < 0)
1424 error(ERROR_SYSTEM|3, "sort error");
1425 if (sp->verbose)
1426 {
1427 if (sp->child)
1428 error(0, "%s process %lld -> %lld", error_info.id, (Sflong_t)(sp->cur - r), (Sflong_t)p);
1429 else
1430 error(0, " %lld", (Sflong_t)p);
1431 }
1432 if (sp->map)
1433 {
1434 if (sp->map > 2)
1435 break;
1436 sfread(ip, sp->buf, p);
1437 if (p)
1438 {
1439 m = -(n - p + 1);
1440 if (((sp->total -= p) / 3) < (sp->end / 2) && sp->total > sp->end)
1441 {
1442 if ((r = flush(sp, r)) < 0)
1443 return -1;
1444 sfsetbuf(ip, NiL, sp->total);
1445 }
1446 }
1447 else if (sp->map == 1)
1448 {
1449 sp->map++;
1450 m = -(n + 1);
1451 }
1452 else if (n > sp->end)
1453 {
1454 error(2, "monster record", n, sp->end, sp->cur, p);
1455 break;
1456 }
1457 else if (sp->key->fixed)
1458 {
1459 error(1, "incomplete record length=%ld", n - p);
1460 break;
1461 }
1462 else
1463 {
1464 sp->cur = n - p;
1465 if (!(b = vmnewof(Vmheap, 0, char, sp->cur, 1)))
1466 error(ERROR_SYSTEM|3, "out of space");
1467 memcpy(b, sp->buf + p, sp->cur);
1468 if (RECTYPE(sp->key->disc->data) == REC_delimited && b[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1469 {
1470 b[sp->cur++] = '\n';
1471 if (c == '\n')
1472 error(1, "newline appended");
1473 else
1474 {
1475 del[0] = c;
1476 del[1] = 0;
1477 error(1, "%s appended", fmtquote(del, "'", "'", 1, 0));
1478 }
1479 }
1480 sp->buf = b;
1481 sp->map++;
1482 goto process;
1483 }
1484 }
1485 else
1486 r += p;
1487 }
1488 error_info.file = 0;
1489 return 0;
1490 }
1491
1492 /*
1493 * sfio part discipline read
1494 */
1495
1496 static ssize_t
partread(Sfio_t * fp,Void_t * buf,size_t size,Sfdisc_t * dp)1497 partread(Sfio_t* fp, Void_t* buf, size_t size, Sfdisc_t* dp)
1498 {
1499 register Part_t* pp = (Part_t*)dp;
1500
1501 if (pp->remain <= 0)
1502 return 0;
1503 if (size > pp->remain)
1504 size = pp->remain;
1505 pp->remain -= size;
1506 return sfrd(fp, buf, size, dp);
1507 }
1508
1509 /*
1510 * sfio part discipline seek
1511 */
1512
1513 static Sfoff_t
partseek(Sfio_t * fp,Sfoff_t lloffset,int op,Sfdisc_t * dp)1514 partseek(Sfio_t* fp, Sfoff_t lloffset, int op, Sfdisc_t* dp)
1515 {
1516 register Part_t* pp = (Part_t*)dp;
1517 off_t offset = lloffset;
1518
1519 switch (op)
1520 {
1521 case SEEK_SET:
1522 offset += pp->offset;
1523 break;
1524 case SEEK_CUR:
1525 offset += pp->offset;
1526 break;
1527 case SEEK_END:
1528 offset = pp->offset + pp->size - offset;
1529 op = SEEK_SET;
1530 break;
1531 }
1532 if ((offset = sfsk(fp, offset, op, dp)) >= 0)
1533 {
1534 offset -= pp->offset;
1535 pp->remain = pp->size - offset;
1536 }
1537 return offset;
1538 }
1539
1540 /*
1541 * job control
1542 * requires single named input file
1543 * no multi-stage merge
1544 */
1545
1546 static void
jobs(register Sort_t * sp)1547 jobs(register Sort_t* sp)
1548 {
1549 register Job_t* jp;
1550 register Job_t* xp;
1551 register int i;
1552 register int j;
1553 register int f;
1554 int status;
1555 char* file;
1556 Sfio_t* ip;
1557 Part_t part;
1558 char id[32];
1559
1560 sp->single = 0;
1561 if (sp->verbose)
1562 error(0, "%s %d processes %lld total", error_info.id, sp->key->nproc, (Sflong_t)sp->total);
1563 xp = sp->jobs + sp->key->nproc;
1564 if (sp->test & TEST_show)
1565 {
1566 for (jp = sp->jobs; jp < xp; jp++)
1567 error(0, "%s#%d pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, jp - sp->jobs + 1, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1568 exit(0);
1569 }
1570 f = 0;
1571 for (jp = sp->jobs; jp < xp; jp++)
1572 for (i = 0; i < jp->intermediates; i++)
1573 if (!(sp->files[f++] = rstempwrite(sp->rec, (Sfio_t*)0)))
1574 error(ERROR_SYSTEM|3, "cannot create intermediate file %d", i);
1575 part.disc.readf = partread;
1576 part.disc.writef = 0;
1577 part.disc.seekf = partseek;
1578 part.disc.exceptf = 0;
1579 part.disc.disc = 0;
1580 file = sp->key->input[0];
1581 j = 0;
1582 for (jp = sp->jobs; jp < xp; jp++)
1583 {
1584 ip = fileopen(sp, file);
1585 switch (fork())
1586 {
1587 case -1:
1588 error(ERROR_SYSTEM|3, "not enough child processes");
1589 case 0:
1590 sp->child = 1;
1591 sfsprintf(id, sizeof(id), "%s#%d", error_info.id, jp - sp->jobs + 1);
1592 error_info.id = id;
1593 sp->end = jp->chunk;
1594 part.offset = jp->offset;
1595 sp->total = part.size = part.remain = jp->size;
1596 sfdisc(ip, &part.disc);
1597 for (i = 0; i < jp->intermediates; i++)
1598 sp->files[i] = sp->files[j++];
1599 while (i < f)
1600 sp->files[i++] = 0;
1601 if (sp->verbose)
1602 error(0, "%s pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1603 exit(input(sp, ip, file, 0) < 0);
1604 }
1605 if (rsfileclose(sp->rec, ip))
1606 exit(1);
1607 sfclose(ip);
1608 j += jp->intermediates;
1609 }
1610 sp->nfiles = f;
1611 i = 0;
1612 j = sp->key->nproc;
1613 while (j > 0)
1614 {
1615 if (wait(&status) != -1)
1616 {
1617 if (status)
1618 i++;
1619 j--;
1620 }
1621 else if (errno != EINTR)
1622 {
1623 error(ERROR_SYSTEM|3, "%d process%s did not complete", j, j == 1 ? "" : "es");
1624 break;
1625 }
1626 }
1627 if (i)
1628 error(3, "%d child process%s failed", i, i == 1 ? "" : "es");
1629 }
1630
1631 /*
1632 * all done
1633 */
1634
1635 static void
done(register Sort_t * sp)1636 done(register Sort_t* sp)
1637 {
1638 while (rsdisc(sp->rec, NiL, RS_POP));
1639 if ((sfsync(sp->op) || sp->op != sfstdout && rsfileclose(sp->rec, sp->op)) && !error_info.errors)
1640 error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1641 if (rsclose(sp->rec))
1642 error(2, "sort error");
1643 if (sp->map > 2)
1644 vmfree(Vmheap, sp->buf);
1645
1646 /*
1647 * if the output would have overwritten an input
1648 * file now is the time to commit to it
1649 */
1650
1651 if (sp->overwrite)
1652 {
1653 if (error_info.errors)
1654 remove(sp->overwrite);
1655 else if (sp->preserve)
1656 {
1657 Sfio_t* ip;
1658 Sfio_t* op;
1659
1660 if (ip = sfopen(NiL, sp->overwrite, "r"))
1661 {
1662 if (op = sfopen(NiL, sp->key->output, "w"))
1663 {
1664 if ((sfmove(ip, op, SF_UNBOUND, -1) < 0 || sfclose(op) || !sfeof(ip)) && !error_info.errors)
1665 error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1666 sfclose(op);
1667 }
1668 else
1669 error(ERROR_SYSTEM|2, "%s: cannot write", sp->key->output);
1670 sfclose(ip);
1671 remove(sp->overwrite);
1672 }
1673 else
1674 error(ERROR_SYSTEM|2, "%s: cannot read", sp->overwrite);
1675 sp->preserve = 0;
1676 }
1677 else if (remove(sp->key->output) || rename(sp->overwrite, sp->key->output))
1678 error(ERROR_SYSTEM|2, "%s: cannot overwrite", sp->key->output);
1679 free(sp->overwrite);
1680 sp->overwrite = 0;
1681 }
1682 if (rskeyclose(sp->key))
1683 error(2, "sort key error");
1684 }
1685
1686 int
main(int argc,char ** argv)1687 main(int argc, char** argv)
1688 {
1689 register char* s;
1690 register Sfio_t* fp;
1691 char* last;
1692 char** merge;
1693 int i;
1694 struct stat st;
1695 Sort_t sort;
1696
1697 error_info.id = "sort";
1698 if (init(&sort, &sort.disc, argv))
1699 exit(1);
1700 if (sort.test & TEST_dump)
1701 {
1702 sfprintf(sfstderr, "main\n\tintermediates=%d\n", sort.xfiles);
1703 rskeydump(sort.key, sfstderr);
1704 }
1705 if (sort.key->type & RS_CAT)
1706 {
1707 while (s = *sort.key->input++)
1708 {
1709 fp = fileopen(&sort, s);
1710 if (sfmove(fp, sfstdout, SF_UNBOUND, -1) < 0 || !sfeof(fp))
1711 error(ERROR_SYSTEM|2, "%s: read error", s);
1712 if (sferror(sfstdout) || rsfileclose(sort.rec, fp))
1713 break;
1714 }
1715 }
1716 else
1717 {
1718 merge = sort.key->merge && sort.key->input[0] && sort.key->input[1] ? sort.key->input : (char**)0;
1719 fp = 0;
1720 if (sort.jobs)
1721 jobs(&sort);
1722 else if (sort.test & TEST_show)
1723 exit(0);
1724 else
1725 {
1726 last = 0;
1727 if (!merge && (sort.rec->type & RS_MORE))
1728 for (i = 0; s = sort.key->input[i]; i++)
1729 if ((pathstdin(s) && !fstat(0, &st) || !stat(s, &st)) && st.st_size)
1730 last = s;
1731 while (s = *sort.key->input++)
1732 {
1733 fp = fileopen(&sort, s);
1734 if (merge)
1735 {
1736 if (sort.nfiles >= sort.xfiles)
1737 {
1738 clear(&sort, NiL);
1739 sort.key->input = merge;
1740 merge = 0;
1741 if (fp != sfstdin)
1742 sfclose(fp);
1743 fp = 0;
1744 continue;
1745 }
1746 sort.files[sort.nfiles++] = fp;
1747 }
1748 else if (input(&sort, fp, s, s == last) < 0)
1749 break;
1750 else if (fp != sfstdin && !sort.map)
1751 {
1752 sfclose(fp);
1753 fp = 0;
1754 }
1755 }
1756 }
1757 if (sort.nfiles)
1758 {
1759 if (sort.cur && flush(&sort, sort.cur) < 0)
1760 return 1;
1761 if (sort.verbose)
1762 error(0, "%s merge text", error_info.id);
1763 if (rsmerge(sort.rec, sort.op, sort.files, sort.nfiles, merge ? RS_TEXT : RS_OTEXT))
1764 error(ERROR_SYSTEM|3, "merge error");
1765 clear(&sort, NiL);
1766 }
1767 else
1768 {
1769 if (sort.verbose)
1770 error(0, "%s write text", error_info.id);
1771 if (rswrite(sort.rec, sort.op, RS_OTEXT) && !error_info.errors)
1772 error(ERROR_SYSTEM|3, "%s: write error", sort.key->output);
1773 if (fp && fp != sfstdin)
1774 sfclose(fp);
1775 }
1776 }
1777 done(&sort);
1778 exit(error_info.errors != 0);
1779 }
1780