1 /***********************************************************************
2 *                                                                      *
3 *               This software is part of the ast package               *
4 *          Copyright (c) 1996-2013 AT&T Intellectual Property          *
5 *                      and is licensed under the                       *
6 *                 Eclipse Public License, Version 1.0                  *
7 *                    by AT&T Intellectual Property                     *
8 *                                                                      *
9 *                A copy of the License is available at                 *
10 *          http://www.eclipse.org/org/documents/epl-v10.html           *
11 *         (with md5 checksum b35adb5213ca9657e911e9befb180842)         *
12 *                                                                      *
13 *              Information and Software Systems Research               *
14 *                            AT&T Research                             *
15 *                           Florham Park NJ                            *
16 *                                                                      *
17 *               Glenn Fowler <glenn.s.fowler@gmail.com>                *
18 *                     Phong Vo <phongvo@gmail.com>                     *
19 *              Doug McIlroy <doug@research.bell-labs.com>              *
20 *                                                                      *
21 ***********************************************************************/
22 #pragma prototyped
23 
24 /*
25  * sort main
26  *
27  * algorithm and interface
28  *
29  *	Glenn Fowler
30  *	Phong Vo
31  *	AT&T Research
32  *
33  * key coders
34  *
35  *	Doug McIlroy
36  *	Bell Laboratories
37  */
38 
39 static const char usage[] =
40 "[-n?\n@(#)$Id: sort (AT&T Research) 2013-09-19 $\n]"
41 USAGE_LICENSE
42 "[+NAME?sort - sort and/or merge files]"
43 "[+DESCRIPTION?\bsort\b sorts lines of all the \afiles\a together and "
44     "writes the result on the standard output. The file name \b-\b means the "
45     "standard input. If no files are named, the standard input is sorted.]"
46 "[+?The default sort key is an entire line. Default ordering is "
47     "lexicographic by bytes in machine collating sequence. The ordering is "
48     "affected globally by the locale and/or the following options, one or "
49     "more of which may appear. See \brecsort\b(3) for details.]"
50 "[+?For backwards compatibility the \b-o\b option is allowed in any file "
51     "operand position when neither the \b-c\b nor the \b--\b options are "
52     "specified.]"
53 "[k:key?Restrict the sort key to a string beginning at \apos1\a and "
54     "ending at \apos2\a. \apos1\a and \apos2\a each have the form \am.n\a, "
55     "counting from 1, optionally followed by one or more of the flags "
56     "\bCMbdfginprZ\b; \bm\b counts fields from the beginning of the line and "
57     "\bn\b counts characters from the beginning of the field. If any flags "
58     "are present they override all the global ordering options for this key. "
59     "If \a.n\a is missing from \apos1\a, it is taken to be 1; if missing "
60     "from \apos2\a, it is taken to be the end of the field. If \apos2\a is "
61     "missing, it is taken to be end of line. The second form specifies a "
62     "fixed record length \areclen\a, and the last form specifies a fixed "
63     "field at byte position \aposition\a (counting from 1) of \alength\a "
64     "bytes. The obsolescent \breclen:fieldlen:offset\b (byte offset from 0) "
65     "is also accepted.]:[pos1[,pos2]]|.reclen|.position.length]]]]]"
66 "[K:oldkey?Specified in pairs: \b-K\b \apos1\a \b-K\b \apos2\a, where "
67     "positions count from 0.]# [pos]"
68 "[R:record|recfmt?Sets the record format to \aformat\a; newlines will be "
69     "treated as normal characters. The formats are:]:[format]"
70     "{"
71         "[+d[\aterminator\a]]?Variable length with record \aterminator\a "
72             "character, \b\\n\b by default.]"
73         "[+[f]]\areclen\a?Fixed record length \areclen\a.]"
74         "[+v[op...]]?Variable length. \bh4o0z2bi\b (4 byte IBM V format "
75             "descriptor) if \aop\a are omitted. \aop\a may be a combination "
76             "of:]"
77             "{"
78                 "[+h\an\a?Header size is \an\a bytes (default 4).]"
79                 "[+o\an\a?Size offset in header is \an\a bytes (default "
80                     "0).]"
81                 "[+z\an\a?Size length is \an\a bytes (default "
82                     "min(\bh\b-\bo\b,2)).]"
83                 "[+b?Size is big-endian (default).]"
84                 "[+l?Size is little-endian (default \bb\b).]"
85                 "[+i?Record length includes header (default).]"
86                 "[+n?Record length does not include header (default "
87                     "\bi\b).]"
88             "}"
89         "[+%?If the record format is not otherwise specified, and the "
90             "any input file name, from left to right, ends with "
91             "\b%\b\aformat\a or \b%\b\aformat\a\b.\b* then the record format "
92             "is set to \aformat\a. In addition, the \b-o\b path, if "
93             "specified and if it does not contain \b%\b and if it names a "
94             "regular file, is renamed to contain the input \b%\b\aformat\a.]"
95         "[+-?The first block of the first input file is sampled to check "
96             "for \bv\b variable length and \bf\b fixed length format "
97             "records. Not all formats are detected. \bsort\b exits with an "
98             "error diagnostic if the record format cannot be determined from "
99             "the sample.]"
100     "}"
101 "[b:ignorespace|ignore-leading-blanks?Ignore leading white space (spaces "
102     "and tabs) in field comparisons.]"
103 "[d:dictionary?`Phone directory' order: only letters, digits and white "
104     "space are significant in string comparisons.]"
105 "[E:codeset|convert?The field data codeset is \acodeset\a or the field "
106     "data must be converted from the \afrom\a codeset to the \ato\a codeset. "
107     "The codesets are:]:[codeset|from::to]"
108     "{\fcodesets\f}"
109 "[f:fold|ignorecase?Fold lower case letters onto upper case.]"
110 "[h:scaled|human-readable?Compare numbers scaled with IEEE 1541-2002 "
111     "suffixes.]"
112 "[i:ignorecontrol?Ignore characters outside the ASCII range 040-0176 in "
113     "string comparisons.]"
114 "[J:shuffle|jumble?Do a random shuffle of the sort keys. \aseed\a "
115     "specifies a pseudo random number generator seed. A \aseed\a of 0 "
116     "generates a seed based on time and pid.]#[seed]"
117 "[n:numeric?An initial numeric string, consisting of optional white "
118     "space, optional sign, and a nonempty string of digits with optional "
119     "decimal point, is sorted by value.]"
120 "[g:floating?Numeric, like \b-n\b, with \be\b-style exponents allowed.]"
121 "[p:bcd|packed-decimal?Compare packed decimal (bcd) numbers with "
122     "trailing sign.]"
123 "[M:months?Compare as month names. The first three characters after "
124     "optional white space are folded to lower case and compared. Invalid "
125     "fields compare low to \bjan\b.]"
126 "[r:reverse|invert?Reverse the sense of comparisons.]"
127 "[t:tabs?`Tab character' separating fields is \achar\a.]:[tab-char]"
128 "[c:check?Check that the single input file is sorted according to the "
129     "ordering rules; give no output on the standard output. If the input "
130     "is out of sort then write one diagnostic line on the standard error "
131     "and exit with code \b1\b.]"
132 "[C:silent-check?Like \b--check\b except no diagnostic is written.]"
133 "[j:processes|nproc|jobs?Use up to \ajobs\a separate processes to sort "
134     "the input. The current implementation still uses one process for the "
135     "final merge phase; improvements are planned.]#[processes]"
136 "[m:merge?Merge; the input files are already sorted.]"
137 "[u:unique?Unique. Keep only the first of multiple records that compare "
138     "equal on all keys. Implies \b-s\b.]"
139 "[s:stable?Stable sort. When all keys compare equal, preserve input "
140     "order. The default is \b--nostable\b (\aunstable\a sort): when all "
141     "keys compare equal, break the tie by using the entire record, ignoring "
142     "all but the \b-r\b option.]"
143 "[o:output?Place output in the designated \afile\a instead of on the "
144     "standard output. This file may be the same as one of the inputs. The "
145     "\afile\a \b-\b names the standard output. The option may appear among "
146     "the file arguments, except after \b--\b.]:[output]"
147 "[l:library?Load the external sort discipline \alibrary\a with optional "
148     "comma separated \aname=value\a arguments. Libraries are loaded, in left "
149     "to right order, after the sort method has been "
150     "initialized.]:[library[,name=value...]]]"
151 "[T:tempdir?Put temporary files in \atempdir\a.]:[tempdir:=/usr/tmp]"
152 "[L:list?List the available sort methods. See the \b-x\b option.]"
153 "[x:method?Specify the sort method to apply:]:[method:=rasp]"
154     "{\fmethods\f} [v:verbose?Trace the sort progress on the standard "
155     "error.]"
156 "[P:plugins?List plugin information for each \afile\a operand in "
157     "--\astyle\a on the standard error. If no \afile\a operands are "
158     "specified then the first instance of each \bsort\b plugin installed on "
159     "\b$PATH\b or a sibling dir on \b$PATH\b is listed. The special "
160     "\astyle\a \blist\b lists a line on the standard output for each plugin "
161     "with the name, a tab character, and plugin specific command line "
162     "options parameterized by \b${style}\b (suitable for \beval\b'ing in "
163     "\bsh\b(1).)]:[style:=list|man|html|nroff|usage]"
164 "[Z:zd|zoned-decimal?Compare zoned decimal (ZD) numbers with embedded "
165     "trailing sign.]"
166 "[z:size|zip?Suggest using the specified number of bytes of internal "
167     "store to tune performance. Power of 2 and power of 10 size suffixes are "
168     "accepted. Type is a single character and may be one of:]:[type[size]]]"
169     "{"
170         "[+a?Buffer alignment.]"
171         "[+b?Input reserve buffer size.]"
172         "[+c?Input chunk size; sort chunks of this size and disable "
173             "merge.]"
174         "[+i?Input buffer size.]"
175         "[+m?Maximum number of intermediate merge files.]"
176         "[+p?Input sort size; sort chunks of this size before merge.]"
177         "[+o?Output buffer size.]"
178         "[+r?Maximum record size.]"
179         "[+I?Decompress the input if it is compressed.]"
180         "[+O?\bgzip\b(1) compress the output.]"
181     "}"
182 "[X:test?Enables implementation defined test code. Some or all of these "
183     "may be disabled.]:[test]"
184     "{"
185         "[+dump?List detailed information on the option settings.]"
186         "[+io?List io file paths.]"
187         "[+keys?List the canonical key for each record.]"
188         "[+read?Force input file read by disabling memory mapping.]"
189         "[+show?Show setup information and exit before sorting.]"
190         "[+test?Immediatly exit with status 0; used to verify this "
191             "implementation]"
192     "}"
193 "[D:debug?Sets the debug trace level. Higher levels produce more "
194     "output.]# [level]"
195 "[S|y?Equivalent to \b-zp\b\asize\a; if \asize\a has no suffix then \bki\b "
196     "is assumed.]:[size]"
197 
198 "\n"
199 "\n[ file ... ]\n"
200 "\n"
201 
202 "[+?+\apos1\a -\apos2\a is the classical alternative to \b-k\b, with "
203     "counting from 0 instead of 1, and pos2 designating next-after-last "
204     "instead of last character of the key. A missing character count in "
205     "\apos2\a means 0, which in turn excludes any \b-t\b tab character from "
206     "the end of the key. Thus +1 -1.3 is the same as \b-k\b 2,2.3 and +1r -3 "
207     "is the same as \b-k\b 2r,3.]"
208 "[+?Under option \b-t\b\ax\a fields are strings separated by \ax\a; "
209     "otherwise fields are non-empty strings separated by white space. White "
210     "space before a field is part of the field, except under option \b-b\b. "
211     "A \bb\b flag may be attached independently to \apos1\a and \apos2\a.]"
212 "[+?When there are multiple sort keys, later keys are compared only "
213     "after all earlier keys compare equal. Except under option \b-s\b, lines "
214     "with all keys equal are ordered with all bytes significant. \b-S\b "
215     "turns off \b-s\b, the last occurrence, left-to-right, takes affect.]"
216 "[+?Sorting is done by a method determined by the \b-x\b option. \b-L\b "
217     "lists the available methods. rasp (radix+splay-tree) is the default and "
218     "current all-around best.]"
219 "[+?Single-letter options may be combined into a single string, such as "
220     "\b-cnrt:\b. The option combination \b-di\b and the combination of "
221     "\b-n\b with any of \b-diM\b are improper. Posix argument conventions "
222     "are supported.]"
223 "[+?Options \b-b\b, \b-c\b, \b-d\b, \b-f\b, \b-i\b, \b-k\b, \b-m\b, "
224     "\b-n\b, \b-o\b, \b-r\b, \b-t\b, and \b-u\b are in the Posix and/or "
225     "X/Open standards.]"
226 
227 "[+DIAGNOSTICS?\asort\a comments and exits with non-zero status for "
228     "various trouble conditions and for disorder discovered under option "
229     "\b-c\b.]"
230 "[+SEE ALSO?\bcomm\b(1), \bjoin\b(1), \buniq\b(1), \brecsort\b(3)]"
231 "[+CAVEATS?The never-documented default \apos1\a=0 for cases such as "
232     "\bsort -1\b has been abolished. An input file overwritten by \b-o\b is "
233     "not replaced until the entire output file is generated in the same "
234     "directory as the input, at which point the input is renamed.]"
235 ;
236 
237 #include <sfio_t.h>
238 #include <ast.h>
239 #include <error.h>
240 #include <debug.h>
241 #include <ctype.h>
242 #include <fs3d.h>
243 #include <ls.h>
244 #include <option.h>
245 #include <recsort.h>
246 #include <recfmt.h>
247 #include <sfdcgzip.h>
248 #include <vmalloc.h>
249 #include <wait.h>
250 #include <iconv.h>
251 #include <dlldefs.h>
252 
253 #define INMIN		(1024)		/* min input buffer size	*/
254 #define INBRK		(64*INMIN)	/* default heap increment	*/
255 #define INMAX		(1024*INBRK)	/* max input buffer size	*/
256 #define INREC		(16*INMIN)	/* record begin chunk size	*/
257 
258 #define TEST_dump	0x80000000	/* dump the state before sort	*/
259 #define TEST_io		0x40000000	/* dump io files		*/
260 #define TEST_keys	0x20000000	/* dump keys			*/
261 #define TEST_read	0x10000000	/* force sfread()		*/
262 #define TEST_show	0x08000000	/* show but don't do		*/
263 #define TEST_reserve	0x04000000	/* force sfreserve()		*/
264 
265 #define pathstdin(s)	(!(s)||streq(s,"-")||streq(s,"/dev/stdin")||streq(s,"/dev/fd/0"))
266 #define pathstdout(s)	(!(s)||streq(s,"-")||streq(s,"/dev/stdout")||streq(s,"/dev/fd/1"))
267 
268 typedef struct Part_s
269 {
270 	Sfdisc_t	disc;		/* sfio discipline		*/
271 	off_t		offset;		/* file offset			*/
272 	off_t		size;		/* total size at offset		*/
273 	off_t		remain;		/* read size remaining		*/
274 } Part_t;
275 
276 typedef struct Job_s
277 {
278 	off_t		offset;		/* file part offset		*/
279 	off_t		size;		/* file part size		*/
280 	size_t		chunk;		/* file part chunk		*/
281 	int		intermediates;	/* number of intermediate files	*/
282 } Job_t;
283 
284 typedef struct Sort_s
285 {
286 	Rskeydisc_t	disc;		/* rskey discipline		*/
287 	Rs_t*		rec;		/* rsopen() context		*/
288 	Rskey_t*	key;		/* rskeyopen() context		*/
289 	Rsdefkey_f	defkeyf;	/* real defkeyf if TEST_keys	*/
290 	Sfio_t*		tp;		/* TEST_keys tmp stream		*/
291 	Sfio_t*		op;		/* output stream		*/
292 	Job_t*		jobs;		/* multi-proc job table		*/
293 	char*		overwrite;	/* -o input overwrite tmp file	*/
294 	char*		buf;		/* input buffer			*/
295 	Sfio_t*		opened;		/* fileopen() peek stream	*/
296 	size_t		cur;		/* input buffer index		*/
297 	size_t		hit;		/* input buffer index overflow	*/
298 	size_t		end;		/* max input buffer index	*/
299 	size_t		bufsize;	/* input reserve buffer size	*/
300 	off_t		total;		/* total size of single file	*/
301 	unsigned long	test;		/* test bit mask		*/
302 	int		child;		/* in child process		*/
303 	int		chunk;		/* chunk the input (no merge)	*/
304 	int		hadstdin;	/* already has - on input	*/
305 	int		map;		/* sfreserve() input		*/
306 	int		mfiles;		/* multi-stage files[] count	*/
307 	int		nfiles;		/* files[] count		*/
308 	int		xfiles;		/* max files[] count		*/
309 	int		preserve;	/* rename() tmp output to input	*/
310 	int		single;		/* one input file		*/
311 	int		verbose;	/* trace main actions		*/
312 	int		zip;		/* sfdcgzip SF_* flags		*/
313 	Sfio_t*		files[OPEN_MAX > 68 ? 64 : (OPEN_MAX-4)];
314 } Sort_t;
315 
316 /*
317  * optget() info discipline function
318  */
319 
320 static int
optinfo(Opt_t * op,Sfio_t * sp,const char * s,Optdisc_t * dp)321 optinfo(Opt_t* op, Sfio_t* sp, const char* s, Optdisc_t* dp)
322 {
323 	register iconv_list_t*	ic;
324 	register int		n;
325 
326 	if (streq(s, "codesets"))
327 	{
328 		n = 0;
329 		for (ic = iconv_list(NiL); ic; ic = iconv_list(ic))
330 			if (ic->ccode >= 0)
331 				n += sfprintf(sp, "[%c:%s?%s]", ic->match[ic->match[0] == '('], ic->name, ic->desc);
332 		return n;
333 	}
334 	else if (streq(s, "methods"))
335 		return rskeylist(NiL, sp, 1);
336 	return 0;
337 }
338 
339 /*
340  * handle RS_VERIFY event
341  */
342 
343 static int
verify(Rs_t * rs,int op,Void_t * data,Void_t * arg,Rsdisc_t * disc)344 verify(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
345 {
346 	if (op == RS_VERIFY)
347 		error(3, "disorder at record %lld", (Sflong_t)((Rsobj_t*)data)->order);
348 	return 0;
349 }
350 
351 static int
verify_silent(Rs_t * rs,int op,Void_t * data,Void_t * arg,Rsdisc_t * disc)352 verify_silent(Rs_t* rs, int op, Void_t* data, Void_t* arg, Rsdisc_t* disc)
353 {
354 	if (op == RS_VERIFY)
355 		exit(1);
356 	return 0;
357 }
358 
359 /*
360  * return read stream for path
361  */
362 
363 static Sfio_t*
fileopen(register Sort_t * sp,const char * path)364 fileopen(register Sort_t* sp, const char* path)
365 {
366 	Sfio_t*		fp;
367 
368 	if (fp = sp->opened)
369 		sp->opened = 0;
370 	else
371 	{
372 		if (pathstdin(path))
373 		{
374 			if (sp->hadstdin)
375 				error(3, "%s: can only read once", path);
376 			sp->hadstdin = 1;
377 			fp = sfstdin;
378 		}
379 		else if (!(fp = sfopen(NiL, path, "r")))
380 			error(ERROR_SYSTEM|3, "%s: cannot open", path);
381 		if (rsfileread(sp->rec, fp, path))
382 		{
383 			if (fp != sfstdin)
384 				sfclose(fp);
385 			return 0;
386 		}
387 		sfset(fp, SF_SHARE, 0);
388 		if (sp->zip & SF_READ)
389 			sfdcgzip(fp, 0);
390 	}
391 	return fp;
392 }
393 
394 /*
395  * prevent ERROR_USAGE|4 messages from exiting
396  */
397 
398 static void
noexit(int code)399 noexit(int code)
400 {
401 }
402 
403 /*
404  * list info for one plugin on the standard error
405  */
406 
407 static void
showlib(Sort_t * sp,Rskey_t * kp,const char * name,const char * style)408 showlib(Sort_t* sp, Rskey_t* kp, const char* name, const char* style)
409 {
410 	char*		args;
411 	char		buf[128];
412 
413 	if (style)
414 		sfsprintf(args = buf, sizeof(buf), "%s,%s", name, style);
415 	else
416 		args = (char*)name;
417 	if (!rslib(sp->rec, kp, args, RS_IGNORE) && !style)
418 		sfprintf(sfstdout, "%s\t--library=%s,${style}\n", name, name);
419 }
420 
421 /*
422  * list info for all [selected] plugins on the standard error
423  */
424 
425 static int
showplugins(Sort_t * sp,Rskey_t * kp,const char * style)426 showplugins(Sort_t* sp, Rskey_t* kp, const char* style)
427 {
428 	Dllscan_t*	dls;
429 	Dllent_t*	dle;
430 	char*		name;
431 	void		(*oexit)(int);
432 
433 	if (streq(style, "list"))
434 		style = 0;
435 	else
436 	{
437 		oexit = error_info.exit;
438 		error_info.exit = noexit;
439 	}
440 	if (*kp->input)
441 		while (name = *kp->input++)
442 			showlib(sp, kp, name, style);
443 	else if (dls = dllsopen("sort", NiL, NiL))
444 	{
445 		while (dle = dllsread(dls))
446 			showlib(sp, kp, dle->name, style);
447 		dllsclose(dls);
448 	}
449 	if (style)
450 		error_info.exit = oexit;
451 	return 0;
452 }
453 
454 struct Lib_s; typedef struct Lib_s Lib_t;
455 
456 struct Lib_s
457 {
458 	Lib_t*		next;
459 	char*		name;
460 };
461 
462 /*
463  * process argv as in sort(1)
464  */
465 
466 static int
parse(register Sort_t * sp,char ** argv)467 parse(register Sort_t* sp, char** argv)
468 {
469 	register Rskey_t*	key = sp->key;
470 	register int		n;
471 	register char*		s;
472 	char*			e;
473 	char*			p;
474 	char**			a;
475 	char**			v;
476 	size_t			z;
477 	int			i;
478 	int			map;
479 	char*			plugins = 0;
480 	Lib_t*			firstlib = 0;
481 	Lib_t*			lastlib = 0;
482 	Lib_t*			lib;
483 	Recfmt_t		r;
484 	Mbstate_t		q;
485 	int			obsolescent = 1;
486 	char			opt[64];
487 	Optdisc_t		optdisc;
488 	struct stat		st;
489 
490 	optinit(&optdisc, optinfo);
491 	for (;;)
492 	{
493 		switch (optget(argv, usage))
494 		{
495 		case 0:
496 			break;
497 		case 'c':
498 		case 'C':
499 			obsolescent = 0;
500 			key->meth = Rsverify;
501 			key->disc->events = RS_VERIFY;
502 			key->disc->eventf = opt_info.option[1] == 'C' ? verify_silent : verify;
503 			continue;
504 		case 'E':
505 		case 'J':
506 			sfsprintf(opt, sizeof(opt), "%c%s", opt_info.option[1], opt_info.arg);
507 			if (rskeyopt(key, opt, 1))
508 				return 0;
509 			continue;
510 		case 'j':
511 			key->nproc = opt_info.num;
512 			continue;
513 		case 'k':
514 			if (rskey(key, opt_info.arg, 0))
515 				return -1;
516 			continue;
517 		case 'l':
518 			if (!(lib = newof(0, Lib_t, 1, 0)))
519 				error(ERROR_SYSTEM|3, "out of space");
520 			lib->name = opt_info.arg;
521 			if (lastlib)
522 				lastlib = lastlib->next = lib;
523 			else
524 				firstlib = lastlib = lib;
525 			continue;
526 		case 'm':
527 			key->merge = !!opt_info.num;
528 			continue;
529 		case 'o':
530 			key->output = opt_info.arg;
531 			continue;
532 		case 's':
533 			if (opt_info.num)
534 				key->type &= ~RS_DATA;
535 			else
536 				key->type |= RS_DATA;
537 			continue;
538 		case 't':
539 			if (key->tab[0])
540 				error(1, "%s: %s conflicts with %s", opt_info.option, *opt_info.arg, key->tab);
541 			mbtinit(&q);
542 			if ((n = mbtsize(opt_info.arg, MB_LEN_MAX, &q)) < 1)
543 			{
544 				error(1, "%s: %s: invalid tab character", opt_info.option, opt_info.arg);
545 				n = 0;
546 			}
547 			if (*(opt_info.arg + n) || n >= sizeof(key->tab))
548 				error(1, "%s: %s: single character expected", opt_info.option, opt_info.arg);
549 			memcpy(key->tab, opt_info.arg, n);
550 			key->tab[n] = 0;
551 			continue;
552 		case 'u':
553 			key->type &= ~RS_DATA;
554 			key->type |= RS_UNIQ;
555 			continue;
556 		case 'v':
557 			key->verbose = !!opt_info.num;
558 			sp->verbose = key->verbose || (key->test & TEST_show);
559 			continue;
560 		case 'x':
561 			if (!(key->meth = rskeymeth(key, opt_info.arg)))
562 				error(2, "%s: unknown method", opt_info.arg);
563 			continue;
564 		case 'z':
565 			if (isalpha(n = *(s = opt_info.arg)))
566 				s++;
567 			else
568 				n = 'r';
569 		size:
570 			z = strton(s, &e, NiL, 1);
571 			if (*e == '%')
572 			{
573 				error(2, "%s %c%s: %% not supported -- do you really want that much memory?", opt_info.option, n, s);
574 				return -1;
575 			}
576 			if (*e || z < ((n == 'm' || n == 'o' || n == 'r' || isupper(n)) ? 0 : 512))
577 			{
578 				error(2, "%s %c%s: invalid size", opt_info.option, n, s);
579 				return -1;
580 			}
581 			switch (n)
582 			{
583 			case 'a':
584 				key->alignsize = z;
585 				break;
586 			case 'b':
587 				sp->bufsize = z;
588 				break;
589 			case 'c':
590 				sp->chunk = 1;
591 				key->alignsize = key->insize = z;
592 				break;
593 			case 'i':
594 				key->insize = z;
595 				break;
596 			case 'm':
597 				if (z <= 0 || z > elementsof(sp->files))
598 					z = elementsof(sp->files);
599 				sp->xfiles = z;
600 				break;
601 			case 'p':
602 				key->procsize = z;
603 				break;
604 			case 'o':
605 				key->outsize = z;
606 				break;
607 			case 'r':
608 				key->recsize = z;
609 				break;
610 			case 'I':
611 				sp->zip |= SF_READ;
612 				break;
613 			case 'O':
614 				sp->zip |= SF_WRITE;
615 				break;
616 			}
617 			continue;
618 		case 'D':
619 			error_info.trace = -opt_info.num;
620 			continue;
621 		case 'K':
622 			if (opt_info.offset)
623 			{
624 				opt_info.offset = 0;
625 				opt_info.index++;
626 			}
627 			if (rskey(key, opt_info.arg, *opt_info.option))
628 				return -1;
629 			continue;
630 		case 'L':
631 			rskeylist(key, sfstdout, 0);
632 			exit(0);
633 		case 'P':
634 			plugins = opt_info.arg;
635 			continue;
636 		case 'R':
637 			key->disc->data = recstr(opt_info.arg, &e);
638 			if (*e)
639 			{
640 				error(2, "%s: invalid record format", opt_info.arg);
641 				return -1;
642 			}
643 			continue;
644 		case 'S':
645 		case 'y':
646 			n = 'p';
647 			s = opt_info.arg;
648 			if (*s && *(e = s + strlen(s) - 1) != '%' && !isalpha(*e))
649 			{
650 				sfsprintf(opt, sizeof(opt), "%ski", s);
651 				s = opt;
652 			}
653 			goto size;
654 		case 'T':
655 			pathtemp(NiL, 0, opt_info.arg, "/TMPPATH", NiL);
656 			continue;
657 		case 'X':
658 			s = opt_info.arg;
659 			opt_info.num = strton(s, &e, NiL, 1);
660 			if (*e)
661 			{
662 				if (streq(s, "dump"))
663 					opt_info.num = TEST_dump;
664 				else if (streq(s, "io"))
665 					opt_info.num = TEST_io;
666 				else if (streq(s, "keys"))
667 					opt_info.num = TEST_keys;
668 				else if (streq(s, "read"))
669 					opt_info.num = TEST_read;
670 				else if (streq(s, "reserve"))
671 					opt_info.num = TEST_reserve;
672 				else if (streq(s, "show"))
673 					opt_info.num = TEST_show;
674 				else if (streq(s, "test"))
675 				{
676 					sfprintf(sfstdout, "ok\n");
677 					exit(0);
678 				}
679 				else
680 					error(1, "%s: unknown test", s);
681 			}
682 			if (*opt_info.option == '+')
683 				key->test &= ~opt_info.num;
684 			else
685 				key->test |= opt_info.num;
686 			sp->test = key->test;
687 			sp->verbose = key->verbose || (key->test & TEST_show);
688 			continue;
689 		case '?':
690 			error(ERROR_USAGE|4, "%s", opt_info.arg);
691 			return 1;
692 		case ':':
693 			error(2, "%s", opt_info.arg);
694 			return -1;
695 		default:
696 			opt[0] = opt_info.option[1];
697 			opt[1] = 0;
698 			if (rskeyopt(key, opt, 1))
699 				return 0;
700 			continue;
701 		}
702 		break;
703 	}
704 	argv += opt_info.index;
705 	if (obsolescent && (opt_info.index <= 1 || !streq(*(argv - 1), "--")))
706 	{
707 		/*
708 		 * check for obsolescent `-o output' after first file operand
709 		 */
710 
711 		a = v = argv;
712 		while (s = *a++)
713 		{
714 			if (*s == '-' && *(s + 1) == 'o')
715 			{
716 				if (!*(s += 2) && !(s = *a++))
717 				{
718 					error(2, "-o: output argument expected");
719 					break;
720 				}
721 				key->output = s;
722 			}
723 			else
724 				*v++ = s;
725 		}
726 		*v = 0;
727 	}
728 	key->input = argv;
729 
730 	/*
731 	 * disciplines have the opportunity to modify key info
732 	 */
733 
734 	while (lib = firstlib)
735 	{
736 		if (rslib(sp->rec, key, lib->name, 0))
737 			return 1;
738 		firstlib = firstlib->next;
739 		free(lib);
740 	}
741 
742 	/*
743 	 * plugins list bails early
744 	 */
745 
746 	if (plugins)
747 		exit(showplugins(sp, key, plugins));
748 
749 	/*
750 	 * record format chicanery
751 	 */
752 
753 	if (map = RECTYPE(key->disc->data) == REC_method && REC_M_INDEX(key->disc->data) == REC_M_path)
754 		for (n = 0, i = -1; p = key->input[n]; n++)
755 			if (s = strrchr(p, '%'))
756 			{
757 				r = recstr(s + 1, &e);
758 				if (!*e || *e == '.' && e > (s + 1))
759 				{
760 					if (r != key->disc->data && i >= 0 && (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_ATTRIBUTES(r) != REC_V_ATTRIBUTES(key->disc->data)))
761 					{
762 						error(2, "%s: format %s incompatible with %s format %s", p, fmtrec(r, 0), key->input[i], fmtrec(key->disc->data, 0));
763 						return 1;
764 					}
765 					if (RECTYPE(r) != REC_variable || RECTYPE(key->disc->data) != REC_variable || REC_V_SIZE(key->disc->data) < REC_V_SIZE(r))
766 						key->disc->data = r;
767 					i = n;
768 				}
769 			}
770 	if (RECTYPE(key->disc->data) == REC_method && ((n = REC_M_INDEX(key->disc->data)) == REC_M_path || n == REC_M_data))
771 	{
772 		if ((sp->opened = fileopen(sp, key->input[0])) && (s = sfreserve(sp->opened, SF_UNBOUND, SF_LOCKR)))
773 		{
774 			struct stat	st;
775 
776 			z = sfvalue(sp->opened);
777 			if (fstat(sffileno(sp->opened), &st) || st.st_size < z)
778 				st.st_size = 0;
779 			key->disc->data = recfmt(s, z, st.st_size);
780 			sfread(sp->opened, s, 0);
781 		}
782 		else
783 		{
784 			z = sp->opened ? sfvalue(sp->opened) : -1;
785 			key->disc->data = REC_N_TYPE();
786 		}
787 		if (z && key->disc->data == REC_N_TYPE())
788 			error(3, "%s: record format cannot be determined from data sample", key->input[0]);
789 	}
790 	if (RECTYPE(key->disc->data) == REC_fixed)
791 		key->fixed = REC_F_SIZE(key->disc->data);
792 	if (map && key->output && key->disc->data != REC_N_TYPE() && (stat(key->output, &st) || S_ISREG(st.st_mode)))
793 	{
794 		if (p = strrchr(key->output, '/'))
795 			s = p + 1;
796 		else
797 			s = key->output;
798 		if (!strchr(s, '%'))
799 		{
800 			p = key->output;
801 			if (!(e = strrchr(s, '.')))
802 				e = s + strlen(s);
803 			if (RECTYPE(key->disc->data) == REC_variable && !REC_V_SIZE(key->disc->data))
804 				key->disc->data |= ((1<<15)-1);
805 			if (!(s = strdup(sfprints("%-*.*s%%%s%s", e - p, e - p, p, fmtrec(key->disc->data, 1), e))))
806 				error(ERROR_SYSTEM|3, "out of space");
807 			key->output = s;
808 			if (sp->verbose)
809 				error(0, "%s rename output %s => %s", error_info.id, p, s);
810 		}
811 	}
812 	if (sp->verbose)
813 		error(0, "%s %s record format", error_info.id, fmtrec(key->disc->data, 0));
814 	return error_info.errors != 0;
815 }
816 
817 /*
818  * dump keys to stderr
819  */
820 
821 static ssize_t
dumpkey(Rs_t * rs,unsigned char * dat,size_t datlen,unsigned char * key,size_t keylen,Rsdisc_t * disc)822 dumpkey(Rs_t* rs, unsigned char* dat, size_t datlen, unsigned char* key, size_t keylen, Rsdisc_t* disc)
823 {
824 	Sort_t*	sp = (Sort_t*)RSKEYDISC(disc);
825 	ssize_t	n;
826 	int	i;
827 	char	buf[2];
828 
829 	if ((n = (*sp->defkeyf)(rs, dat, datlen, key, keylen, disc)) > 0)
830 	{
831 		buf[1] = 0;
832 		for (i = 0; i < n; i++)
833 		{
834 			buf[0] = key[i];
835 			sfputr(sp->tp, fmtesc(buf), -1);
836 		}
837 		sfprintf(sfstderr, "key: %s\n", sfstruse(sp->tp));
838 	}
839 	return n;
840 }
841 
842 /*
843  * initialize sp from argv
844  */
845 
846 static int
init(register Sort_t * sp,Rskeydisc_t * dp,char ** argv)847 init(register Sort_t* sp, Rskeydisc_t* dp, char** argv)
848 {
849 	register Rskey_t*	key;
850 	register char*		s;
851 	register char**		p;
852 	char*			t;
853 	int			n;
854 	unsigned long		x;
855 	unsigned long		z;
856 	size_t			fixed;
857 	struct stat		is;
858 	struct stat		os;
859 
860 	memset(sp, 0, sizeof(*sp));
861 	sp->xfiles = elementsof(sp->files);
862 	sfset(sfstdout, SF_SHARE, 0);
863 	sfset(sfstderr, SF_SHARE, 0);
864 	Vmdcsbrk->round = INBRK;
865 	dp->version = RSKEY_VERSION;
866 	dp->flags = 0;
867 	dp->errorf = errorf;
868 	if (!(sp->key = key = rskeyopen(dp, NiL)) || !(sp->rec = rsnew(key->disc)))
869 		return -1;
870 	z = key->insize = 2 * INMAX;
871 #if 0
872 	if (conformance(0, 0))
873 #endif
874 	key->type |= RS_DATA;
875 	if ((n = strtol(astconf("PAGESIZE", NiL, NiL), &t, 0)) > 0 && !*t)
876 		key->alignsize = n;
877 	if ((n = parse(sp, argv)) || rskeyinit(key))
878 	{
879 		rskeyclose(key);
880 		if (n < 0)
881 			error(ERROR_USAGE|4, "%s", optusage(NiL));
882 		return -1;
883 	}
884 
885 	/*
886 	 * finalize the buffer dimensions
887 	 */
888 
889 	if ((x = key->insize) != z)
890 		z = 0;
891 	if (x > INMAX)
892 		x = INMAX;
893 	else if (x < INMIN && !sp->chunk)
894 		x = INMIN;
895 	if (sp->single = !key->input[1])
896 	{
897 		if (!(sp->opened = fileopen(sp, key->input[0])))
898 			error(ERROR_SYSTEM|3, "%s: cannot open", key->input[0]);
899 		if (fstat(sffileno(sp->opened), &is))
900 			error(ERROR_SYSTEM|3, "%s: cannot stat", key->input[0]);
901 		if (!S_ISREG(is.st_mode) || sp->opened->disc) /* XXX: need sfio call to test if any disc pushed */
902 		{
903 			sp->total = 0;
904 			sp->test |= TEST_read;
905 		}
906 		else if (x > (sp->total = is.st_size))
907 			x = sp->total;
908 	}
909 	else
910 		sp->test |= TEST_read;
911 	if (sp->zip & SF_READ)
912 		sp->test |= TEST_read;
913 	fixed = key->fixed;
914 	if ((sp->test & TEST_reserve) || !(sp->test & TEST_read))
915 	{
916 		sp->map = 1;
917 		if (z)
918 			x = key->procsize;
919 		if (fixed)
920 			x += fixed - x % fixed;
921 	}
922 	else
923 	{
924 		if (z)
925 			x = key->procsize;
926 		for (;;)
927 		{
928 			if (fixed)
929 				x += fixed - x % fixed;
930 			if (sp->buf = (char*)vmalign(Vmheap, x, key->alignsize))
931 				break;
932 			if ((x >>= 1) < INMIN)
933 				error(ERROR_SYSTEM|3, "out of space");
934 		}
935 		sp->hit = x - key->alignsize;
936 	}
937 	if (sp->test & TEST_keys)
938 	{
939 		if (!key->disc->defkeyf)
940 			error(2, "no key function to intercept");
941 		else if (!(sp->tp = sfstropen()))
942 			error(ERROR_SYSTEM|3, "out of space");
943 		else
944 		{
945 			sp->defkeyf = key->disc->defkeyf;
946 			key->disc->defkeyf = dumpkey;
947 		}
948 	}
949 	if (key->nproc > 1)
950 	{
951 		off_t		offset;
952 		off_t		total;
953 		off_t		size;
954 		size_t		chunk;
955 		int		i;
956 		Job_t*		jp;
957 
958 		if (!sp->map || pathstdin(key->input[0]))
959 		{
960 	uno:
961 			key->nproc = 1;
962 		}
963 		else if ((n = (sp->total + key->procsize - 1) / (key->procsize)) <= 1)
964 			goto uno;
965 		else
966 		{
967 			if (n < key->nproc)
968 				key->nproc = n;
969 			else
970 				n = key->nproc;
971 			if (!(sp->jobs = vmnewof(Vmheap, 0, Job_t, n, 0)))
972 				goto uno;
973 			size = (sp->total + n - 1) / n;
974 			if (fixed)
975 			{
976 				if (size % fixed)
977 					size += fixed - size % fixed;
978 				i = (size + x - 1) / x;
979 				if (i * n > sp->xfiles)
980 				{
981 					error(1, "multi-process multi-stage not implemented; falling back to one processor");
982 					goto uno;
983 				}
984 				chunk = size / i;
985 				if (chunk % fixed)
986 					chunk += fixed - chunk % fixed;
987 				size = chunk * i;
988 				offset = 0;
989 				total = sp->total;
990 				for (jp = sp->jobs; jp < sp->jobs + n; jp++)
991 				{
992 					jp->offset = offset;
993 					if (size > total)
994 						size = total;
995 					total -= (jp->size = size);
996 					jp->chunk = chunk;
997 					jp->intermediates = i;
998 					offset += size;
999 				}
1000 				if (key->procsize > chunk)
1001 					key->procsize = chunk;
1002 				else
1003 				{
1004 					size = key->procsize;
1005 					i = (chunk + size - 1) / size;
1006 					size = chunk / i;
1007 					if (size % fixed)
1008 						size += fixed - size % fixed;
1009 					key->procsize = size;
1010 				}
1011 			}
1012 			else
1013 			{
1014 				register char*	s;
1015 				register char*	t;
1016 				register char*	b;
1017 				off_t		ideal;
1018 				off_t		scan;
1019 				char*		file;
1020 				Sfio_t*		ip;
1021 
1022 				i = (size + x - 1) / x;
1023 				if (i * n > sp->xfiles)
1024 				{
1025 					error(1, "multi-process multi-stage not implemented; falling back to one processor");
1026 					goto uno;
1027 				}
1028 				chunk = (size + i - 1) / i;
1029 				size = ideal = chunk * i;
1030 				if ((scan = INREC) >= ideal)
1031 					scan = (ideal / 32) * 4;
1032 				offset = 0;
1033 				total = sp->total;
1034 				file = key->input[0];
1035 				if (!(ip = fileopen(sp, file)))
1036 					error(ERROR_SYSTEM|3, "%s: cannot read", file);
1037 				for (jp = sp->jobs; jp < sp->jobs + n; jp++)
1038 				{
1039 					jp->offset = offset;
1040 					if (((size = ideal) + scan) >= total)
1041 						size = total;
1042 					else
1043 					{
1044 						/*UNDENT...*/
1045 
1046 	/*
1047 	 * snoop around for the closest record boundary
1048 	 */
1049 
1050 	size -= scan / 2;
1051 	if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1052 		error(ERROR_SYSTEM|3, "%s: record boundary seek error at offset %lld", file, (Sflong_t)offset + size);
1053 	if (!(b = (char*)sfreserve(ip, scan, 0)))
1054 		error(ERROR_SYSTEM|3, "%s: record boundary read error at offset %lld", file, (Sflong_t)offset + size);
1055 	s = t = b + scan / 2 - 1;
1056 	while (*s++ != '\n')
1057 	{
1058 		if (t < b)
1059 		{
1060 		bigger:
1061 			if (((size += scan) + offset) >= (total - scan))
1062 				error(3, "%s: monster record at offset %lld", (Sflong_t)offset);
1063 			if (sfseek(ip, offset + size, SEEK_SET) != (offset + size))
1064 				error(ERROR_SYSTEM|3, "%s: record boundary input seek error at %lld", file, (Sflong_t)offset + size);
1065 			if (!(b = (char*)sfreserve(ip, scan, 0)))
1066 				error(ERROR_SYSTEM|3, "%s: record boundary read error at %lld", file, (Sflong_t)offset + size);
1067 			t = (s = b) + scan;
1068 			do
1069 			{
1070 				if (s >= t)
1071 					goto bigger;
1072 			} while (*s++ != '\n');
1073 			break;
1074 		}
1075 		if (*t-- == '\n')
1076 		{
1077 			s = t + 2;
1078 			break;
1079 		}
1080 	}
1081 	size += (s - b);
1082 
1083 						/*...INDENT*/
1084 					}
1085 					total -= (jp->size = size);
1086 					jp->chunk = (size + i - 1) / i;
1087 					if (jp->chunk > chunk)
1088 						chunk = jp->chunk;
1089 					jp->intermediates = i;
1090 					offset += size;
1091 				}
1092 				if (rsfileclose(sp->rec, ip))
1093 					return -1;
1094 				sfclose(ip);
1095 				key->procsize = (key->procsize > chunk) ? chunk : chunk / ((chunk + key->procsize - 1) / key->procsize);
1096 			}
1097 		}
1098 	}
1099 	key->insize = sp->end = x;
1100 
1101 	/*
1102 	 * check the output file for clash with the input files
1103 	 */
1104 
1105 	n = stat("/dev/null", &is);
1106 	if (pathstdout(key->output))
1107 	{
1108 		key->output = "/dev/stdout";
1109 		sp->op = sfstdout;
1110 		if (!n && !fstat(sffileno(sp->op), &os) && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1111 			key->type |= RS_IGNORE;
1112 	}
1113 	else if (key->input)
1114 	{
1115 		if (!stat(key->output, &os))
1116 		{
1117 			if (!n && os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1118 				key->type |= RS_IGNORE;
1119 			else if (eaccess(key->output, W_OK))
1120 				error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1121 			else if (!fs3d(FS3D_TEST) || !iview(&os))
1122 			{
1123 				p = key->input;
1124 				while (s = *p++)
1125 					if (!pathstdin(s))
1126 					{
1127 						if (stat(s, &is))
1128 							error(ERROR_SYSTEM|2, "%s: not found", s);
1129 						else if (os.st_dev == is.st_dev && os.st_ino == is.st_ino)
1130 						{
1131 							if (t = strrchr(key->output, '/'))
1132 							{
1133 								s = key->output;
1134 								*t = 0;
1135 							}
1136 							else
1137 								s = ".";
1138 							if (sp->overwrite = pathtemp(NiL, 0, s, error_info.id, &n))
1139 								sp->op = sfnew(NiL, NiL, SF_UNBOUND, n, SF_WRITE);
1140 							if (t)
1141 								*t = '/';
1142 							if (!sp->op || fstat(n, &is))
1143 								error(ERROR_SYSTEM|3, "%s: cannot create overwrite file %s", key->output, sp->overwrite);
1144 							if (os.st_uid != is.st_uid || os.st_gid != is.st_gid)
1145 								sp->preserve = 1;
1146 							break;
1147 						}
1148 					}
1149 			}
1150 		}
1151 		if (!sp->overwrite && !(sp->op = sfopen(NiL, key->output, "w")))
1152 			error(ERROR_SYSTEM|3, "%s: cannot write", key->output);
1153 	}
1154 	if (rsfilewrite(sp->rec, sp->op, key->output))
1155 		return -1;
1156 	if (key->outsize > 0)
1157 		sfsetbuf(sp->op, NiL, key->outsize);
1158 	if (sp->zip & SF_WRITE)
1159 		sfdcgzip(sp->op, 0);
1160 
1161 	/*
1162 	 * finally ready for recsort now
1163 	 */
1164 
1165 	if (rsinit(sp->rec, key->meth, key->procsize, key->type, key))
1166 	{
1167 		error(ERROR_SYSTEM|2, "sort library initialization error");
1168 		rskeyclose(key);
1169 		return -1;
1170 	}
1171 	if (sp->rec->meth->type == RS_MTCOPY)
1172 		sp->chunk = 1;
1173 	if (sp->test & TEST_io)
1174 	{
1175 		for (n = 0; s = key->input[n]; n++)
1176 			error(0, "%s input[%d]\t\"%s\"", error_info.id, n, s);
1177 		if (s = key->output)
1178 			error(0, "%s output\t\"%s\"", error_info.id, s);
1179 	}
1180 	return 0;
1181 }
1182 
1183 /*
1184  * close sp->files and push fp if not 0
1185  */
1186 
1187 static void
clear(register Sort_t * sp,Sfio_t * fp)1188 clear(register Sort_t* sp, Sfio_t* fp)
1189 {
1190 	register int	i;
1191 
1192 	for (i = fp ? sp->mfiles : 0; i < sp->nfiles; i++)
1193 	{
1194 		rstempclose(sp->rec, sp->files[i]);
1195 		sp->files[i] = 0;
1196 	}
1197 	if (fp)
1198 	{
1199 		sp->files[sp->mfiles++] = fp;
1200 		sp->nfiles = sp->mfiles;
1201 		if (sp->mfiles >= (sp->xfiles - 1))
1202 			sp->mfiles = 0;
1203 	}
1204 	else
1205 		sp->nfiles = sp->mfiles = 0;
1206 }
1207 
1208 /*
1209  * flush the intermediate data
1210  * r is the partial record offset
1211  * updated r is returned
1212  */
1213 
1214 static ssize_t
flush(register Sort_t * sp,register size_t r)1215 flush(register Sort_t* sp, register size_t r)
1216 {
1217 	register Sfio_t*	fp;
1218 	register size_t		n;
1219 	register size_t		m;
1220 	register size_t		b;
1221 
1222 	if (sp->chunk)
1223 	{
1224 		/*
1225 		 * skip merge and output sorted chunk
1226 		 */
1227 
1228 		if (rswrite(sp->rec, sp->op, RS_OTEXT))
1229 		{
1230 			if (!error_info.errors)
1231 				error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1232 			return -1;
1233 		}
1234 	}
1235 	else if (sp->rec->meth->type != RS_MTVERIFY)
1236 	{
1237 		/*
1238 		 * write to an intermediate file and rewind for rsmerge
1239 		 */
1240 
1241 		if (!(fp = sp->files[sp->nfiles]))
1242 		{
1243 			if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1244 				error(ERROR_SYSTEM|3, "cannot create intermediate sort file %d", sp->nfiles);
1245 			sp->files[sp->nfiles] = fp;
1246 		}
1247 		sp->nfiles++;
1248 		if (sp->verbose)
1249 			error(0, "%s write intermediate", error_info.id);
1250 		if (rswrite(sp->rec, fp, 0))
1251 		{
1252 			error(ERROR_SYSTEM|2, "intermediate sort file write error");
1253 			return -1;
1254 		}
1255 		if (rstempread(sp->rec, fp))
1256 		{
1257 			error(ERROR_SYSTEM|2, "intermediate sort file rewind error");
1258 			return -1;
1259 		}
1260 
1261 		/*
1262 		 * multi-stage merge when open file limit exceeded
1263 		 */
1264 
1265 		if (sp->nfiles >= sp->xfiles)
1266 		{
1267 			if (sp->child || !(fp = rstempwrite(sp->rec, (Sfio_t*)0)))
1268 				error(ERROR_SYSTEM|3, "cannot create intermediate merge file");
1269 			if (sp->verbose)
1270 				error(0, "%s merge multi-stage intermediate", error_info.id);
1271 			if (rsmerge(sp->rec, fp, sp->files + sp->mfiles, sp->nfiles - sp->mfiles, 0))
1272 			{
1273 				error(ERROR_SYSTEM|2, "intermediate merge file write error");
1274 				return -1;
1275 			}
1276 			if (rstempread(sp->rec, fp))
1277 			{
1278 				error(ERROR_SYSTEM|2, "intermediate merge file rewind error");
1279 				return -1;
1280 			}
1281 			clear(sp, fp);
1282 		}
1283 	}
1284 
1285 	/*
1286 	 * slide over partial record data so the next read is aligned
1287 	 */
1288 
1289 	if (!sp->map && (m = sp->cur - r))
1290 	{
1291 		n = roundof(m, sp->key->alignsize) - m;
1292 		if (n < r)
1293 		{
1294 			m = n;
1295 			while (r < sp->cur)
1296 				sp->buf[n++] = sp->buf[r++];
1297 			sp->cur = n;
1298 		}
1299 		else
1300 		{
1301 			b = r;
1302 			r += m;
1303 			n += m;
1304 			sp->cur = n;
1305 			while (r > b)
1306 				sp->buf[--n] = sp->buf[--r];
1307 			m = n;
1308 		}
1309 	}
1310 	else
1311 		m = sp->cur = 0;
1312 	return m;
1313 }
1314 
1315 /*
1316  * input the records for file ip
1317  */
1318 
1319 static int
input(register Sort_t * sp,Sfio_t * ip,const char * name,int last)1320 input(register Sort_t* sp, Sfio_t* ip, const char* name, int last)
1321 {
1322 	register ssize_t	n;
1323 	register ssize_t	p;
1324 	register ssize_t	m;
1325 	register ssize_t	r;
1326 	size_t			z;
1327 	Sfoff_t			w;
1328 	char*			b;
1329 	int			c;
1330 	char			del[2];
1331 
1332 	/*
1333 	 * align the read buffer and
1334 	 * loop on insize chunks
1335 	 */
1336 
1337 	error_info.file = ip == sfstdin ? (char*)0 : (char*)name;
1338 	m = -1;
1339 	z = 0;
1340 	if (sp->bufsize)
1341 		sfsetbuf(ip, NiL, sp->bufsize);
1342 	else if (sfsize(ip) > SF_BUFSIZE)
1343 	{
1344 		if (sp->map)
1345 			sfsetbuf(ip, NiL, z = sp->end);
1346 		else
1347 			sfsetbuf(ip, NiL, 0);
1348 	}
1349 	if (sp->map)
1350 		w = sfsize(ip);
1351 	r = sp->cur = roundof(sp->cur, sp->key->alignsize);
1352 	p = 0;
1353 	for (;;)
1354 	{
1355 		if (sp->cur > sp->hit)
1356 		{
1357 			if (sp->single && !sp->nfiles && sp->total == (sp->map ? 0 : p))
1358 				break;
1359 			if ((r = flush(sp, r)) < 0)
1360 				return -1;
1361 		}
1362 		if (!sp->map)
1363 		{
1364 			n = sfeof(ip) ? 0 : sfread(ip, sp->buf + sp->cur, sp->end - sp->cur);
1365 			if (last)
1366 			{
1367 				if ((c = sfgetc(ip)) == EOF)
1368 					sp->rec->type |= RS_LAST;
1369 				else
1370 					sfungetc(ip, c);
1371 			}
1372 		}
1373 		else
1374 		{
1375 			sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1376 			n = sfvalue(ip);
1377 			if (!sp->buf)
1378 			{
1379 				if (m < 0 && n < -m && z == sp->end)
1380 				{
1381 					sfsetbuf(ip, NiL, z = 2 * sp->end);
1382 					sp->buf = (char*)sfreserve(ip, m, SF_LOCKR);
1383 					n = sfvalue(ip);
1384 					if (sp->verbose && n)
1385 						error(0, "%s buffer boundary expand to %I*d", error_info.id, sizeof(n), n);
1386 				}
1387 				if (!sp->buf && n > 0 && !(sp->buf = sfreserve(ip, n, SF_LOCKR)))
1388 					n = -1;
1389 			}
1390 			if (last && w == (sftell(ip) + n))
1391 				sp->rec->type |= RS_LAST;
1392 			if (sp->verbose)
1393 				error(0, "%s reserve %*d => %*d", error_info.id, sizeof(m), m, sizeof(n), n);
1394 		}
1395 		if (n <= 0)
1396 		{
1397 			if (n < 0)
1398 				error(ERROR_SYSTEM|3, "read error");
1399 			if (sp->cur <= r)
1400 				break;
1401 			if (sp->key->fixed)
1402 			{
1403 				error(1, "incomplete record length=%lld", (Sflong_t)(sp->cur - r));
1404 				break;
1405 			}
1406 			if (RECTYPE(sp->key->disc->data) == REC_delimited && sp->buf[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1407 			{
1408 				sp->buf[sp->cur++] = c;
1409 				if (c == '\n')
1410 					error(1, "newline appended");
1411 				else
1412 				{
1413 					del[0] = c;
1414 					del[1] = 0;
1415 					error(1, "%s appended", fmtquote(del, "'", NiL, 1, 0));
1416 				}
1417 			}
1418 		}
1419 		sp->cur += n;
1420 	process:
1421 		if (sp->verbose && !sp->child)
1422 			error(ERROR_PROMPT, "%s process %lld ->", error_info.id, (Sflong_t)(sp->cur - r));
1423 		if ((p = rsprocess(sp->rec, sp->buf + r, sp->cur - r)) < 0)
1424 			error(ERROR_SYSTEM|3, "sort error");
1425 		if (sp->verbose)
1426 		{
1427 			if (sp->child)
1428 				error(0, "%s process %lld -> %lld", error_info.id, (Sflong_t)(sp->cur - r), (Sflong_t)p);
1429 			else
1430 				error(0, " %lld", (Sflong_t)p);
1431 		}
1432 		if (sp->map)
1433 		{
1434 			if (sp->map > 2)
1435 				break;
1436 			sfread(ip, sp->buf, p);
1437 			if (p)
1438 			{
1439 				m = -(n - p + 1);
1440 				if (((sp->total -= p) / 3) < (sp->end / 2) && sp->total > sp->end)
1441 				{
1442 					if ((r = flush(sp, r)) < 0)
1443 						return -1;
1444 					sfsetbuf(ip, NiL, sp->total);
1445 				}
1446 			}
1447 			else if (sp->map == 1)
1448 			{
1449 				sp->map++;
1450 				m = -(n + 1);
1451 			}
1452 			else if (n > sp->end)
1453 			{
1454 				error(2, "monster record", n, sp->end, sp->cur, p);
1455 				break;
1456 			}
1457 			else if (sp->key->fixed)
1458 			{
1459 				error(1, "incomplete record length=%ld", n - p);
1460 				break;
1461 			}
1462 			else
1463 			{
1464 				sp->cur = n - p;
1465 				if (!(b = vmnewof(Vmheap, 0, char, sp->cur, 1)))
1466 					error(ERROR_SYSTEM|3, "out of space");
1467 				memcpy(b, sp->buf + p, sp->cur);
1468 				if (RECTYPE(sp->key->disc->data) == REC_delimited && b[sp->cur - 1] != (c = REC_D_DELIMITER(sp->key->disc->data)))
1469 				{
1470 					b[sp->cur++] = '\n';
1471 					if (c == '\n')
1472 						error(1, "newline appended");
1473 					else
1474 					{
1475 						del[0] = c;
1476 						del[1] = 0;
1477 						error(1, "%s appended", fmtquote(del, "'", "'", 1, 0));
1478 					}
1479 				}
1480 				sp->buf = b;
1481 				sp->map++;
1482 				goto process;
1483 			}
1484 		}
1485 		else
1486 			r += p;
1487 	}
1488 	error_info.file = 0;
1489 	return 0;
1490 }
1491 
1492 /*
1493  * sfio part discipline read
1494  */
1495 
1496 static ssize_t
partread(Sfio_t * fp,Void_t * buf,size_t size,Sfdisc_t * dp)1497 partread(Sfio_t* fp, Void_t* buf, size_t size, Sfdisc_t* dp)
1498 {
1499 	register Part_t*	pp = (Part_t*)dp;
1500 
1501 	if (pp->remain <= 0)
1502 		return 0;
1503 	if (size > pp->remain)
1504 		size = pp->remain;
1505 	pp->remain -= size;
1506 	return sfrd(fp, buf, size, dp);
1507 }
1508 
1509 /*
1510  * sfio part discipline seek
1511  */
1512 
1513 static Sfoff_t
partseek(Sfio_t * fp,Sfoff_t lloffset,int op,Sfdisc_t * dp)1514 partseek(Sfio_t* fp, Sfoff_t lloffset, int op, Sfdisc_t* dp)
1515 {
1516 	register Part_t*	pp = (Part_t*)dp;
1517 	off_t			offset = lloffset;
1518 
1519 	switch (op)
1520 	{
1521 	case SEEK_SET:
1522 		offset += pp->offset;
1523 		break;
1524 	case SEEK_CUR:
1525 		offset += pp->offset;
1526 		break;
1527 	case SEEK_END:
1528 		offset = pp->offset + pp->size - offset;
1529 		op = SEEK_SET;
1530 		break;
1531 	}
1532 	if ((offset = sfsk(fp, offset, op, dp)) >= 0)
1533 	{
1534 		offset -= pp->offset;
1535 		pp->remain = pp->size - offset;
1536 	}
1537 	return offset;
1538 }
1539 
1540 /*
1541  * job control
1542  * requires single named input file
1543  * no multi-stage merge
1544  */
1545 
1546 static void
jobs(register Sort_t * sp)1547 jobs(register Sort_t* sp)
1548 {
1549 	register Job_t*	jp;
1550 	register Job_t*	xp;
1551 	register int	i;
1552 	register int	j;
1553 	register int	f;
1554 	int		status;
1555 	char*		file;
1556 	Sfio_t*		ip;
1557 	Part_t		part;
1558 	char		id[32];
1559 
1560 	sp->single = 0;
1561 	if (sp->verbose)
1562 		error(0, "%s %d processes %lld total", error_info.id, sp->key->nproc, (Sflong_t)sp->total);
1563 	xp = sp->jobs + sp->key->nproc;
1564 	if (sp->test & TEST_show)
1565 	{
1566 		for (jp = sp->jobs; jp < xp; jp++)
1567 			error(0, "%s#%d pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, jp - sp->jobs + 1, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1568 		exit(0);
1569 	}
1570 	f = 0;
1571 	for (jp = sp->jobs; jp < xp; jp++)
1572 		for (i = 0; i < jp->intermediates; i++)
1573 			if (!(sp->files[f++] = rstempwrite(sp->rec, (Sfio_t*)0)))
1574 				error(ERROR_SYSTEM|3, "cannot create intermediate file %d", i);
1575 	part.disc.readf = partread;
1576 	part.disc.writef = 0;
1577 	part.disc.seekf = partseek;
1578 	part.disc.exceptf = 0;
1579 	part.disc.disc = 0;
1580 	file = sp->key->input[0];
1581 	j = 0;
1582 	for (jp = sp->jobs; jp < xp; jp++)
1583 	{
1584 		ip = fileopen(sp, file);
1585 		switch (fork())
1586 		{
1587 		case -1:
1588 			error(ERROR_SYSTEM|3, "not enough child processes");
1589 		case 0:
1590 			sp->child = 1;
1591 			sfsprintf(id, sizeof(id), "%s#%d", error_info.id, jp - sp->jobs + 1);
1592 			error_info.id = id;
1593 			sp->end = jp->chunk;
1594 			part.offset = jp->offset;
1595 			sp->total = part.size = part.remain = jp->size;
1596 			sfdisc(ip, &part.disc);
1597 			for (i = 0; i < jp->intermediates; i++)
1598 				sp->files[i] = sp->files[j++];
1599 			while (i < f)
1600 				sp->files[i++] = 0;
1601 			if (sp->verbose)
1602 				error(0, "%s pos %12lld : len %10lld : buf %10lld : num %2d", error_info.id, (Sflong_t)jp->offset, (Sflong_t)jp->size, (Sflong_t)jp->chunk, jp->intermediates);
1603 			exit(input(sp, ip, file, 0) < 0);
1604 		}
1605 		if (rsfileclose(sp->rec, ip))
1606 			exit(1);
1607 		sfclose(ip);
1608 		j += jp->intermediates;
1609 	}
1610 	sp->nfiles = f;
1611 	i = 0;
1612 	j = sp->key->nproc;
1613 	while (j > 0)
1614 	{
1615 		if (wait(&status) != -1)
1616 		{
1617 			if (status)
1618 				i++;
1619 			j--;
1620 		}
1621 		else if (errno != EINTR)
1622 		{
1623 			error(ERROR_SYSTEM|3, "%d process%s did not complete", j, j == 1 ? "" : "es");
1624 			break;
1625 		}
1626 	}
1627 	if (i)
1628 		error(3, "%d child process%s failed", i, i == 1 ? "" : "es");
1629 }
1630 
1631 /*
1632  * all done
1633  */
1634 
1635 static void
done(register Sort_t * sp)1636 done(register Sort_t* sp)
1637 {
1638 	while (rsdisc(sp->rec, NiL, RS_POP));
1639 	if ((sfsync(sp->op) || sp->op != sfstdout && rsfileclose(sp->rec, sp->op)) && !error_info.errors)
1640 		error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1641 	if (rsclose(sp->rec))
1642 		error(2, "sort error");
1643 	if (sp->map > 2)
1644 		vmfree(Vmheap, sp->buf);
1645 
1646 	/*
1647 	 * if the output would have overwritten an input
1648 	 * file now is the time to commit to it
1649 	 */
1650 
1651 	if (sp->overwrite)
1652 	{
1653 		if (error_info.errors)
1654 			remove(sp->overwrite);
1655 		else if (sp->preserve)
1656 		{
1657 			Sfio_t*	ip;
1658 			Sfio_t*	op;
1659 
1660 			if (ip = sfopen(NiL, sp->overwrite, "r"))
1661 			{
1662 				if (op = sfopen(NiL, sp->key->output, "w"))
1663 				{
1664 					if ((sfmove(ip, op, SF_UNBOUND, -1) < 0 || sfclose(op) || !sfeof(ip)) && !error_info.errors)
1665 						error(ERROR_SYSTEM|2, "%s: write error", sp->key->output);
1666 					sfclose(op);
1667 				}
1668 				else
1669 					error(ERROR_SYSTEM|2, "%s: cannot write", sp->key->output);
1670 				sfclose(ip);
1671 				remove(sp->overwrite);
1672 			}
1673 			else
1674 				error(ERROR_SYSTEM|2, "%s: cannot read", sp->overwrite);
1675 			sp->preserve = 0;
1676 		}
1677 		else if (remove(sp->key->output) || rename(sp->overwrite, sp->key->output))
1678 			error(ERROR_SYSTEM|2, "%s: cannot overwrite", sp->key->output);
1679 		free(sp->overwrite);
1680 		sp->overwrite = 0;
1681 	}
1682 	if (rskeyclose(sp->key))
1683 		error(2, "sort key error");
1684 }
1685 
1686 int
main(int argc,char ** argv)1687 main(int argc, char** argv)
1688 {
1689 	register char*		s;
1690 	register Sfio_t*	fp;
1691 	char*			last;
1692 	char**			merge;
1693 	int			i;
1694 	struct stat		st;
1695 	Sort_t			sort;
1696 
1697 	error_info.id = "sort";
1698 	if (init(&sort, &sort.disc, argv))
1699 		exit(1);
1700 	if (sort.test & TEST_dump)
1701 	{
1702 		sfprintf(sfstderr, "main\n\tintermediates=%d\n", sort.xfiles);
1703 		rskeydump(sort.key, sfstderr);
1704 	}
1705 	if (sort.key->type & RS_CAT)
1706 	{
1707 		while (s = *sort.key->input++)
1708 		{
1709 			fp = fileopen(&sort, s);
1710 			if (sfmove(fp, sfstdout, SF_UNBOUND, -1) < 0 || !sfeof(fp))
1711 				error(ERROR_SYSTEM|2, "%s: read error", s);
1712 			if (sferror(sfstdout) || rsfileclose(sort.rec, fp))
1713 				break;
1714 		}
1715 	}
1716 	else
1717 	{
1718 		merge = sort.key->merge && sort.key->input[0] && sort.key->input[1] ? sort.key->input : (char**)0;
1719 		fp = 0;
1720 		if (sort.jobs)
1721 			jobs(&sort);
1722 		else if (sort.test & TEST_show)
1723 			exit(0);
1724 		else
1725 		{
1726 			last = 0;
1727 			if (!merge && (sort.rec->type & RS_MORE))
1728 				for (i = 0; s = sort.key->input[i]; i++)
1729 					if ((pathstdin(s) && !fstat(0, &st) || !stat(s, &st)) && st.st_size)
1730 						last = s;
1731 			while (s = *sort.key->input++)
1732 			{
1733 				fp = fileopen(&sort, s);
1734 				if (merge)
1735 				{
1736 					if (sort.nfiles >= sort.xfiles)
1737 					{
1738 						clear(&sort, NiL);
1739 						sort.key->input = merge;
1740 						merge = 0;
1741 						if (fp != sfstdin)
1742 							sfclose(fp);
1743 						fp = 0;
1744 						continue;
1745 					}
1746 					sort.files[sort.nfiles++] = fp;
1747 				}
1748 				else if (input(&sort, fp, s, s == last) < 0)
1749 					break;
1750 				else if (fp != sfstdin && !sort.map)
1751 				{
1752 					sfclose(fp);
1753 					fp = 0;
1754 				}
1755 			}
1756 		}
1757 		if (sort.nfiles)
1758 		{
1759 			if (sort.cur && flush(&sort, sort.cur) < 0)
1760 				return 1;
1761 			if (sort.verbose)
1762 				error(0, "%s merge text", error_info.id);
1763 			if (rsmerge(sort.rec, sort.op, sort.files, sort.nfiles, merge ? RS_TEXT : RS_OTEXT))
1764 				error(ERROR_SYSTEM|3, "merge error");
1765 			clear(&sort, NiL);
1766 		}
1767 		else
1768 		{
1769 			if (sort.verbose)
1770 				error(0, "%s write text", error_info.id);
1771 			if (rswrite(sort.rec, sort.op, RS_OTEXT) && !error_info.errors)
1772 				error(ERROR_SYSTEM|3, "%s: write error", sort.key->output);
1773 			if (fp && fp != sfstdin)
1774 				sfclose(fp);
1775 		}
1776 	}
1777 	done(&sort);
1778 	exit(error_info.errors != 0);
1779 }
1780