xref: /illumos-gate/usr/src/cmd/sort/streams_stdio.c (revision 101e15b5)
1*101e15b5SRichard Lowe /*
2*101e15b5SRichard Lowe  * CDDL HEADER START
3*101e15b5SRichard Lowe  *
4*101e15b5SRichard Lowe  * The contents of this file are subject to the terms of the
5*101e15b5SRichard Lowe  * Common Development and Distribution License, Version 1.0 only
6*101e15b5SRichard Lowe  * (the "License").  You may not use this file except in compliance
7*101e15b5SRichard Lowe  * with the License.
8*101e15b5SRichard Lowe  *
9*101e15b5SRichard Lowe  * You can obtain a copy of the license at usr/src/OPENSOLARIS.LICENSE
10*101e15b5SRichard Lowe  * or http://www.opensolaris.org/os/licensing.
11*101e15b5SRichard Lowe  * See the License for the specific language governing permissions
12*101e15b5SRichard Lowe  * and limitations under the License.
13*101e15b5SRichard Lowe  *
14*101e15b5SRichard Lowe  * When distributing Covered Code, include this CDDL HEADER in each
15*101e15b5SRichard Lowe  * file and include the License file at usr/src/OPENSOLARIS.LICENSE.
16*101e15b5SRichard Lowe  * If applicable, add the following below this CDDL HEADER, with the
17*101e15b5SRichard Lowe  * fields enclosed by brackets "[]" replaced with your own identifying
18*101e15b5SRichard Lowe  * information: Portions Copyright [yyyy] [name of copyright owner]
19*101e15b5SRichard Lowe  *
20*101e15b5SRichard Lowe  * CDDL HEADER END
21*101e15b5SRichard Lowe  */
22*101e15b5SRichard Lowe /*
23*101e15b5SRichard Lowe  * Copyright 2004 Sun Microsystems, Inc.  All rights reserved.
24*101e15b5SRichard Lowe  * Use is subject to license terms.
25*101e15b5SRichard Lowe  */
26*101e15b5SRichard Lowe 
27*101e15b5SRichard Lowe #include "streams_stdio.h"
28*101e15b5SRichard Lowe #include "streams_common.h"
29*101e15b5SRichard Lowe 
30*101e15b5SRichard Lowe #define	SHELF_OCCUPIED	1
31*101e15b5SRichard Lowe #define	SHELF_VACANT	0
32*101e15b5SRichard Lowe static int shelf = SHELF_VACANT;
33*101e15b5SRichard Lowe 
34*101e15b5SRichard Lowe /*
35*101e15b5SRichard Lowe  * Single-byte character file i/o-based streams implementation
36*101e15b5SRichard Lowe  *
37*101e15b5SRichard Lowe  *   The routines in this file contain the implementation of the i/o streams
38*101e15b5SRichard Lowe  *   interface for those situations where the input is via stdio.
39*101e15b5SRichard Lowe  *
40*101e15b5SRichard Lowe  * The "shelf"
41*101e15b5SRichard Lowe  *   In the case where the input buffer contains insufficient room to hold the
42*101e15b5SRichard Lowe  *   entire line, the fractional line is shelved, and will be grafted to on the
43*101e15b5SRichard Lowe  *   subsequent read.
44*101e15b5SRichard Lowe  */
45*101e15b5SRichard Lowe int
stream_stdio_open_for_write(stream_t * str)46*101e15b5SRichard Lowe stream_stdio_open_for_write(stream_t *str)
47*101e15b5SRichard Lowe {
48*101e15b5SRichard Lowe 	stream_simple_file_t	*SF = &(str->s_type.SF);
49*101e15b5SRichard Lowe 
50*101e15b5SRichard Lowe 	ASSERT(!(str->s_status & STREAM_OPEN));
51*101e15b5SRichard Lowe 	ASSERT(!(str->s_status & STREAM_OUTPUT));
52*101e15b5SRichard Lowe 
53*101e15b5SRichard Lowe 	if (str->s_status & STREAM_NOTFILE)
54*101e15b5SRichard Lowe 		SF->s_fd = fileno(stdout);
55*101e15b5SRichard Lowe 	else
56*101e15b5SRichard Lowe 		if ((SF->s_fd = open(str->s_filename, O_CREAT | O_TRUNC |
57*101e15b5SRichard Lowe 		    O_WRONLY, OUTPUT_MODE)) < 0) {
58*101e15b5SRichard Lowe 			if (errno == EMFILE || errno == ENFILE)
59*101e15b5SRichard Lowe 				return (-1);
60*101e15b5SRichard Lowe 			else
61*101e15b5SRichard Lowe 				die(EMSG_OPEN, str->s_filename);
62*101e15b5SRichard Lowe 		}
63*101e15b5SRichard Lowe 
64*101e15b5SRichard Lowe 	stream_set(str, STREAM_OPEN | STREAM_OUTPUT);
65*101e15b5SRichard Lowe 
66*101e15b5SRichard Lowe 	return (1);
67*101e15b5SRichard Lowe }
68*101e15b5SRichard Lowe 
69*101e15b5SRichard Lowe /*
70*101e15b5SRichard Lowe  * In the case of an instantaneous stream, we allocate a small buffer (64k) here
71*101e15b5SRichard Lowe  * for the stream; otherwise, the s_buffer and s_buffer_size members should have
72*101e15b5SRichard Lowe  * been set by stream_set_size() prior to calling stream_prime().
73*101e15b5SRichard Lowe  *
74*101e15b5SRichard Lowe  * Repriming (priming an already primed stream) is done when we are reentering a
75*101e15b5SRichard Lowe  * file after having sorted a previous portion of the file.
76*101e15b5SRichard Lowe  */
77*101e15b5SRichard Lowe static int
stream_stdio_prime(stream_t * str)78*101e15b5SRichard Lowe stream_stdio_prime(stream_t *str)
79*101e15b5SRichard Lowe {
80*101e15b5SRichard Lowe 	stream_buffered_file_t *BF = &(str->s_type.BF);
81*101e15b5SRichard Lowe 	char *current_position;
82*101e15b5SRichard Lowe 	char *end_of_buffer;
83*101e15b5SRichard Lowe 	char *next_nl;
84*101e15b5SRichard Lowe 
85*101e15b5SRichard Lowe 	ASSERT(!(str->s_status & STREAM_OUTPUT));
86*101e15b5SRichard Lowe 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
87*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
88*101e15b5SRichard Lowe 
89*101e15b5SRichard Lowe 	if (str->s_status & STREAM_INSTANT && (str->s_buffer == NULL)) {
90*101e15b5SRichard Lowe 		str->s_buffer = xzmap(0, STDIO_VBUF_SIZE, PROT_READ |
91*101e15b5SRichard Lowe 		    PROT_WRITE, MAP_PRIVATE, 0);
92*101e15b5SRichard Lowe 		if (str->s_buffer == MAP_FAILED)
93*101e15b5SRichard Lowe 			die(EMSG_MMAP);
94*101e15b5SRichard Lowe 		str->s_buffer_size = STDIO_VBUF_SIZE;
95*101e15b5SRichard Lowe 	}
96*101e15b5SRichard Lowe 
97*101e15b5SRichard Lowe 	ASSERT(str->s_buffer != NULL);
98*101e15b5SRichard Lowe 
99*101e15b5SRichard Lowe 	if (stream_is_primed(str)) {
100*101e15b5SRichard Lowe 		/*
101*101e15b5SRichard Lowe 		 * l_data_length is only set to -1 in the case of coincidental
102*101e15b5SRichard Lowe 		 * exhaustion of the input butter.  This is thus the only case
103*101e15b5SRichard Lowe 		 * which involves no copying on a re-prime.
104*101e15b5SRichard Lowe 		 */
105*101e15b5SRichard Lowe 		int shelf_state = shelf;
106*101e15b5SRichard Lowe 
107*101e15b5SRichard Lowe 		ASSERT(str->s_current.l_data_length >= -1);
108*101e15b5SRichard Lowe 		(void) memcpy(str->s_buffer, str->s_current.l_data.sp,
109*101e15b5SRichard Lowe 		    str->s_current.l_data_length + 1);
110*101e15b5SRichard Lowe 		str->s_current.l_data.sp = str->s_buffer;
111*101e15b5SRichard Lowe 
112*101e15b5SRichard Lowe 		/*
113*101e15b5SRichard Lowe 		 * If our current line is incomplete, we need to get the rest of
114*101e15b5SRichard Lowe 		 * the line--if we can't, then we've exhausted memory.
115*101e15b5SRichard Lowe 		 */
116*101e15b5SRichard Lowe 		if ((str->s_current.l_data_length == -1 ||
117*101e15b5SRichard Lowe 		    shelf_state == SHELF_OCCUPIED ||
118*101e15b5SRichard Lowe 		    *(str->s_current.l_data.sp +
119*101e15b5SRichard Lowe 		    str->s_current.l_data_length) != '\n') &&
120*101e15b5SRichard Lowe 		    SOP_FETCH(str) == NEXT_LINE_INCOMPLETE &&
121*101e15b5SRichard Lowe 		    shelf_state == SHELF_OCCUPIED)
122*101e15b5SRichard Lowe 			die(EMSG_MEMORY);
123*101e15b5SRichard Lowe 
124*101e15b5SRichard Lowe 		str->s_current.l_collate.sp = NULL;
125*101e15b5SRichard Lowe 		str->s_current.l_collate_length = 0;
126*101e15b5SRichard Lowe 
127*101e15b5SRichard Lowe 		return (PRIME_SUCCEEDED);
128*101e15b5SRichard Lowe 	}
129*101e15b5SRichard Lowe 
130*101e15b5SRichard Lowe 	stream_set(str, STREAM_PRIMED);
131*101e15b5SRichard Lowe 
132*101e15b5SRichard Lowe 	current_position = (char *)str->s_buffer;
133*101e15b5SRichard Lowe 	end_of_buffer = (char *)str->s_buffer + str->s_buffer_size;
134*101e15b5SRichard Lowe 
135*101e15b5SRichard Lowe 	trip_eof(BF->s_fp);
136*101e15b5SRichard Lowe 	if (!feof(BF->s_fp))
137*101e15b5SRichard Lowe 		(void) fgets(current_position, end_of_buffer - current_position,
138*101e15b5SRichard Lowe 		    BF->s_fp);
139*101e15b5SRichard Lowe 	else {
140*101e15b5SRichard Lowe 		stream_set(str, STREAM_EOS_REACHED);
141*101e15b5SRichard Lowe 		stream_unset(str, STREAM_PRIMED);
142*101e15b5SRichard Lowe 		return (PRIME_FAILED_EMPTY_FILE);
143*101e15b5SRichard Lowe 	}
144*101e15b5SRichard Lowe 
145*101e15b5SRichard Lowe 	str->s_current.l_data.sp = current_position;
146*101e15b5SRichard Lowe 	/*
147*101e15b5SRichard Lowe 	 * Because one might run sort on a binary file, strlen() is no longer
148*101e15b5SRichard Lowe 	 * trustworthy--we must explicitly search for a newline.
149*101e15b5SRichard Lowe 	 */
150*101e15b5SRichard Lowe 	if ((next_nl = memchr(current_position, '\n',
151*101e15b5SRichard Lowe 	    end_of_buffer - current_position)) == NULL) {
152*101e15b5SRichard Lowe 		warn(WMSG_NEWLINE_ADDED, str->s_filename);
153*101e15b5SRichard Lowe 		str->s_current.l_data_length = MIN(strlen(current_position),
154*101e15b5SRichard Lowe 		    end_of_buffer - current_position);
155*101e15b5SRichard Lowe 	} else {
156*101e15b5SRichard Lowe 		str->s_current.l_data_length = next_nl - current_position;
157*101e15b5SRichard Lowe 	}
158*101e15b5SRichard Lowe 
159*101e15b5SRichard Lowe 	str->s_current.l_collate.sp = NULL;
160*101e15b5SRichard Lowe 	str->s_current.l_collate_length = 0;
161*101e15b5SRichard Lowe 
162*101e15b5SRichard Lowe 	__S(stats_incr_fetches());
163*101e15b5SRichard Lowe 	return (PRIME_SUCCEEDED);
164*101e15b5SRichard Lowe }
165*101e15b5SRichard Lowe 
166*101e15b5SRichard Lowe /*
167*101e15b5SRichard Lowe  * stream_stdio_fetch() guarantees the return of a complete line, or a flag
168*101e15b5SRichard Lowe  * indicating that the complete line could not be read.
169*101e15b5SRichard Lowe  */
170*101e15b5SRichard Lowe static ssize_t
stream_stdio_fetch(stream_t * str)171*101e15b5SRichard Lowe stream_stdio_fetch(stream_t *str)
172*101e15b5SRichard Lowe {
173*101e15b5SRichard Lowe 	ssize_t	dist_to_buf_end;
174*101e15b5SRichard Lowe 	int ret_val;
175*101e15b5SRichard Lowe 	char *graft_pt, *next_nl;
176*101e15b5SRichard Lowe 
177*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
178*101e15b5SRichard Lowe 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
179*101e15b5SRichard Lowe 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
180*101e15b5SRichard Lowe 
181*101e15b5SRichard Lowe 	graft_pt = str->s_current.l_data.sp + str->s_current.l_data_length + 1;
182*101e15b5SRichard Lowe 
183*101e15b5SRichard Lowe 	if (shelf == SHELF_VACANT) {
184*101e15b5SRichard Lowe 		/*
185*101e15b5SRichard Lowe 		 * The graft point is the start of the current line.
186*101e15b5SRichard Lowe 		 */
187*101e15b5SRichard Lowe 		str->s_current.l_data.sp = graft_pt;
188*101e15b5SRichard Lowe 	} else if (str->s_current.l_data_length > -1) {
189*101e15b5SRichard Lowe 		/*
190*101e15b5SRichard Lowe 		 * Correct for terminating NUL on shelved line.  This NUL is
191*101e15b5SRichard Lowe 		 * only present if we didn't have the coincidental case
192*101e15b5SRichard Lowe 		 * mentioned in the comment below.
193*101e15b5SRichard Lowe 		 */
194*101e15b5SRichard Lowe 		graft_pt--;
195*101e15b5SRichard Lowe 	}
196*101e15b5SRichard Lowe 
197*101e15b5SRichard Lowe 	dist_to_buf_end = str->s_buffer_size - (graft_pt -
198*101e15b5SRichard Lowe 	    (char *)str->s_buffer);
199*101e15b5SRichard Lowe 
200*101e15b5SRichard Lowe 	if (dist_to_buf_end <= 1) {
201*101e15b5SRichard Lowe 		/*
202*101e15b5SRichard Lowe 		 * fgets()'s behaviour in the case of a one-character buffer is
203*101e15b5SRichard Lowe 		 * somewhat unhelpful:  it fills the buffer with '\0' and
204*101e15b5SRichard Lowe 		 * returns successfully (even if EOF has been reached for the
205*101e15b5SRichard Lowe 		 * file in question).  Since we may be in the middle of a
206*101e15b5SRichard Lowe 		 * grafting operation, we leave early, maintaining the shelf in
207*101e15b5SRichard Lowe 		 * its current state.
208*101e15b5SRichard Lowe 		 */
209*101e15b5SRichard Lowe 		str->s_current.l_data_length = -1;
210*101e15b5SRichard Lowe 		return (NEXT_LINE_INCOMPLETE);
211*101e15b5SRichard Lowe 	}
212*101e15b5SRichard Lowe 
213*101e15b5SRichard Lowe 	if (fgets(graft_pt, dist_to_buf_end, str->s_type.BF.s_fp) == NULL) {
214*101e15b5SRichard Lowe 		if (feof(str->s_type.BF.s_fp))
215*101e15b5SRichard Lowe 			stream_set(str, STREAM_EOS_REACHED);
216*101e15b5SRichard Lowe 		else
217*101e15b5SRichard Lowe 			die(EMSG_READ, str->s_filename);
218*101e15b5SRichard Lowe 	}
219*101e15b5SRichard Lowe 
220*101e15b5SRichard Lowe 	trip_eof(str->s_type.BF.s_fp);
221*101e15b5SRichard Lowe 	/*
222*101e15b5SRichard Lowe 	 * Because one might run sort on a binary file, strlen() is no longer
223*101e15b5SRichard Lowe 	 * trustworthy--we must explicitly search for a newline.
224*101e15b5SRichard Lowe 	 */
225*101e15b5SRichard Lowe 	if ((next_nl = memchr(str->s_current.l_data.sp, '\n',
226*101e15b5SRichard Lowe 	    dist_to_buf_end)) == NULL) {
227*101e15b5SRichard Lowe 		str->s_current.l_data_length = strlen(str->s_current.l_data.sp);
228*101e15b5SRichard Lowe 	} else {
229*101e15b5SRichard Lowe 		str->s_current.l_data_length = next_nl -
230*101e15b5SRichard Lowe 		    str->s_current.l_data.sp;
231*101e15b5SRichard Lowe 	}
232*101e15b5SRichard Lowe 
233*101e15b5SRichard Lowe 	str->s_current.l_collate_length = 0;
234*101e15b5SRichard Lowe 
235*101e15b5SRichard Lowe 	if (*(str->s_current.l_data.sp + str->s_current.l_data_length) !=
236*101e15b5SRichard Lowe 	    '\n') {
237*101e15b5SRichard Lowe 		if (!feof(str->s_type.BF.s_fp)) {
238*101e15b5SRichard Lowe 			/*
239*101e15b5SRichard Lowe 			 * We were only able to read part of the line; note that
240*101e15b5SRichard Lowe 			 * we have something on the shelf for our next fetch.
241*101e15b5SRichard Lowe 			 * If the shelf was previously occupied, and we still
242*101e15b5SRichard Lowe 			 * can't get the entire line, then we need more
243*101e15b5SRichard Lowe 			 * resources.
244*101e15b5SRichard Lowe 			 */
245*101e15b5SRichard Lowe 			if (shelf == SHELF_OCCUPIED)
246*101e15b5SRichard Lowe 				die(EMSG_MEMORY);
247*101e15b5SRichard Lowe 
248*101e15b5SRichard Lowe 			shelf = SHELF_OCCUPIED;
249*101e15b5SRichard Lowe 			ret_val = NEXT_LINE_INCOMPLETE;
250*101e15b5SRichard Lowe 
251*101e15b5SRichard Lowe 			__S(stats_incr_shelves());
252*101e15b5SRichard Lowe 		} else {
253*101e15b5SRichard Lowe 			stream_set(str, STREAM_EOS_REACHED);
254*101e15b5SRichard Lowe 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
255*101e15b5SRichard Lowe 		}
256*101e15b5SRichard Lowe 	} else {
257*101e15b5SRichard Lowe 		shelf = SHELF_VACANT;
258*101e15b5SRichard Lowe 		ret_val = NEXT_LINE_COMPLETE;
259*101e15b5SRichard Lowe 		__S(stats_incr_fetches());
260*101e15b5SRichard Lowe 	}
261*101e15b5SRichard Lowe 
262*101e15b5SRichard Lowe 	return (ret_val);
263*101e15b5SRichard Lowe }
264*101e15b5SRichard Lowe 
265*101e15b5SRichard Lowe /*
266*101e15b5SRichard Lowe  * stdio_fetch_overwrite() is used when we are performing an operation where we
267*101e15b5SRichard Lowe  * need the buffer contents only over a single period.  (merge and check are
268*101e15b5SRichard Lowe  * operations of this kind.)  In this case, we read the current line at the head
269*101e15b5SRichard Lowe  * of the stream's defined buffer.  If we cannot read the entire line, we have
270*101e15b5SRichard Lowe  * not allocated sufficient memory.
271*101e15b5SRichard Lowe  */
272*101e15b5SRichard Lowe ssize_t
stream_stdio_fetch_overwrite(stream_t * str)273*101e15b5SRichard Lowe stream_stdio_fetch_overwrite(stream_t *str)
274*101e15b5SRichard Lowe {
275*101e15b5SRichard Lowe 	ssize_t	dist_to_buf_end;
276*101e15b5SRichard Lowe 
277*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
278*101e15b5SRichard Lowe 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
279*101e15b5SRichard Lowe 	ASSERT((str->s_status & STREAM_EOS_REACHED) == 0);
280*101e15b5SRichard Lowe 
281*101e15b5SRichard Lowe 	str->s_current.l_data.sp = str->s_buffer;
282*101e15b5SRichard Lowe 	dist_to_buf_end = str->s_buffer_size;
283*101e15b5SRichard Lowe 
284*101e15b5SRichard Lowe 	if (fgets(str->s_current.l_data.sp, dist_to_buf_end,
285*101e15b5SRichard Lowe 	    str->s_type.BF.s_fp) == NULL) {
286*101e15b5SRichard Lowe 		if (feof(str->s_type.BF.s_fp))
287*101e15b5SRichard Lowe 			stream_set(str, STREAM_EOS_REACHED);
288*101e15b5SRichard Lowe 		else
289*101e15b5SRichard Lowe 			die(EMSG_READ, str->s_filename);
290*101e15b5SRichard Lowe 	}
291*101e15b5SRichard Lowe 
292*101e15b5SRichard Lowe 	trip_eof(str->s_type.BF.s_fp);
293*101e15b5SRichard Lowe 	str->s_current.l_data_length = strlen(str->s_current.l_data.sp) - 1;
294*101e15b5SRichard Lowe 	str->s_current.l_collate_length = 0;
295*101e15b5SRichard Lowe 
296*101e15b5SRichard Lowe 	if (str->s_current.l_data_length == -1 ||
297*101e15b5SRichard Lowe 	    *(str->s_current.l_data.sp + str->s_current.l_data_length) !=
298*101e15b5SRichard Lowe 	    '\n') {
299*101e15b5SRichard Lowe 		if (!feof(str->s_type.BF.s_fp)) {
300*101e15b5SRichard Lowe 			/*
301*101e15b5SRichard Lowe 			 * In the overwrite case, failure to read the entire
302*101e15b5SRichard Lowe 			 * line means our buffer size was insufficient (as we
303*101e15b5SRichard Lowe 			 * are using all of it).  Exit, requesting more
304*101e15b5SRichard Lowe 			 * resources.
305*101e15b5SRichard Lowe 			 */
306*101e15b5SRichard Lowe 			die(EMSG_MEMORY);
307*101e15b5SRichard Lowe 		} else {
308*101e15b5SRichard Lowe 			stream_set(str, STREAM_EOS_REACHED);
309*101e15b5SRichard Lowe 			warn(WMSG_NEWLINE_ADDED, str->s_filename);
310*101e15b5SRichard Lowe 		}
311*101e15b5SRichard Lowe 	}
312*101e15b5SRichard Lowe 
313*101e15b5SRichard Lowe 	__S(stats_incr_fetches());
314*101e15b5SRichard Lowe 	return (NEXT_LINE_COMPLETE);
315*101e15b5SRichard Lowe }
316*101e15b5SRichard Lowe 
317*101e15b5SRichard Lowe int
stream_stdio_is_closable(stream_t * str)318*101e15b5SRichard Lowe stream_stdio_is_closable(stream_t *str)
319*101e15b5SRichard Lowe {
320*101e15b5SRichard Lowe 	if (str->s_status & STREAM_OPEN && !(str->s_status & STREAM_NOTFILE))
321*101e15b5SRichard Lowe 		return (1);
322*101e15b5SRichard Lowe 	return (0);
323*101e15b5SRichard Lowe }
324*101e15b5SRichard Lowe 
325*101e15b5SRichard Lowe int
stream_stdio_close(stream_t * str)326*101e15b5SRichard Lowe stream_stdio_close(stream_t *str)
327*101e15b5SRichard Lowe {
328*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
329*101e15b5SRichard Lowe 
330*101e15b5SRichard Lowe 	if (!(str->s_status & STREAM_OUTPUT)) {
331*101e15b5SRichard Lowe 		if (!(str->s_status & STREAM_NOTFILE))
332*101e15b5SRichard Lowe 			(void) fclose(str->s_type.BF.s_fp);
333*101e15b5SRichard Lowe 
334*101e15b5SRichard Lowe 		if (str->s_type.BF.s_vbuf != NULL) {
335*101e15b5SRichard Lowe 			free(str->s_type.BF.s_vbuf);
336*101e15b5SRichard Lowe 			str->s_type.BF.s_vbuf = NULL;
337*101e15b5SRichard Lowe 		}
338*101e15b5SRichard Lowe 	} else {
339*101e15b5SRichard Lowe 		if (cxwrite(str->s_type.SF.s_fd, NULL, 0) == 0)
340*101e15b5SRichard Lowe 			(void) close(str->s_type.SF.s_fd);
341*101e15b5SRichard Lowe 		else
342*101e15b5SRichard Lowe 			die(EMSG_WRITE, str->s_filename);
343*101e15b5SRichard Lowe 	}
344*101e15b5SRichard Lowe 
345*101e15b5SRichard Lowe 	stream_unset(str, STREAM_OPEN | STREAM_PRIMED | STREAM_OUTPUT);
346*101e15b5SRichard Lowe 	return (1);
347*101e15b5SRichard Lowe }
348*101e15b5SRichard Lowe 
349*101e15b5SRichard Lowe static void
stream_stdio_send_eol(stream_t * str)350*101e15b5SRichard Lowe stream_stdio_send_eol(stream_t *str)
351*101e15b5SRichard Lowe {
352*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
353*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OUTPUT);
354*101e15b5SRichard Lowe 
355*101e15b5SRichard Lowe 	if (cxwrite(str->s_type.SF.s_fd, "\n", 1) < 0)
356*101e15b5SRichard Lowe 		die(EMSG_WRITE, str->s_filename);
357*101e15b5SRichard Lowe }
358*101e15b5SRichard Lowe 
359*101e15b5SRichard Lowe void
stream_stdio_flush(stream_t * str)360*101e15b5SRichard Lowe stream_stdio_flush(stream_t *str)
361*101e15b5SRichard Lowe {
362*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
363*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OUTPUT);
364*101e15b5SRichard Lowe 
365*101e15b5SRichard Lowe 	if (cxwrite(str->s_type.SF.s_fd, NULL, 0) < 0)
366*101e15b5SRichard Lowe 		die(EMSG_WRITE, str->s_filename);
367*101e15b5SRichard Lowe }
368*101e15b5SRichard Lowe 
369*101e15b5SRichard Lowe static void
stream_stdio_put_line(stream_t * str,line_rec_t * line)370*101e15b5SRichard Lowe stream_stdio_put_line(stream_t *str, line_rec_t *line)
371*101e15b5SRichard Lowe {
372*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
373*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OUTPUT);
374*101e15b5SRichard Lowe 
375*101e15b5SRichard Lowe 	if (line->l_data_length >= 0) {
376*101e15b5SRichard Lowe 		if (cxwrite(str->s_type.SF.s_fd, line->l_data.sp,
377*101e15b5SRichard Lowe 		    line->l_data_length) < 0)
378*101e15b5SRichard Lowe 			die(EMSG_WRITE, str->s_filename);
379*101e15b5SRichard Lowe 
380*101e15b5SRichard Lowe 		stream_stdio_send_eol(str);
381*101e15b5SRichard Lowe 		__S(stats_incr_puts());
382*101e15b5SRichard Lowe 	}
383*101e15b5SRichard Lowe 	safe_free(line->l_raw_collate.sp);
384*101e15b5SRichard Lowe 	line->l_raw_collate.sp = NULL;
385*101e15b5SRichard Lowe }
386*101e15b5SRichard Lowe 
387*101e15b5SRichard Lowe void
stream_stdio_put_line_unique(stream_t * str,line_rec_t * line)388*101e15b5SRichard Lowe stream_stdio_put_line_unique(stream_t *str, line_rec_t *line)
389*101e15b5SRichard Lowe {
390*101e15b5SRichard Lowe 	static line_rec_t pvs;
391*101e15b5SRichard Lowe 	static size_t collate_buf_len;
392*101e15b5SRichard Lowe 
393*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OPEN);
394*101e15b5SRichard Lowe 	ASSERT(str->s_status & STREAM_OUTPUT);
395*101e15b5SRichard Lowe 
396*101e15b5SRichard Lowe 	if (pvs.l_collate.sp != NULL &&
397*101e15b5SRichard Lowe 	    collated(&pvs, line, 0, COLL_UNIQUE) == 0) {
398*101e15b5SRichard Lowe 		__S(stats_incr_not_unique());
399*101e15b5SRichard Lowe 		return;
400*101e15b5SRichard Lowe 	}
401*101e15b5SRichard Lowe 
402*101e15b5SRichard Lowe 	__S(stats_incr_put_unique());
403*101e15b5SRichard Lowe 	stream_stdio_put_line(str, line);
404*101e15b5SRichard Lowe 
405*101e15b5SRichard Lowe 	if (line->l_collate_length + 1 > collate_buf_len) {
406*101e15b5SRichard Lowe 		pvs.l_collate.sp = safe_realloc(pvs.l_collate.sp,
407*101e15b5SRichard Lowe 		    line->l_collate_length + 1);
408*101e15b5SRichard Lowe 		collate_buf_len = line->l_collate_length + 1;
409*101e15b5SRichard Lowe 	}
410*101e15b5SRichard Lowe 
411*101e15b5SRichard Lowe 	(void) memcpy(pvs.l_collate.sp, line->l_collate.sp,
412*101e15b5SRichard Lowe 	    line->l_collate_length);
413*101e15b5SRichard Lowe 	*(pvs.l_collate.sp + line->l_collate_length) = '\0';
414*101e15b5SRichard Lowe 	pvs.l_collate_length = line->l_collate_length;
415*101e15b5SRichard Lowe }
416*101e15b5SRichard Lowe 
417*101e15b5SRichard Lowe int
stream_stdio_unlink(stream_t * str)418*101e15b5SRichard Lowe stream_stdio_unlink(stream_t *str)
419*101e15b5SRichard Lowe {
420*101e15b5SRichard Lowe 	if (!(str->s_status & STREAM_NOTFILE))
421*101e15b5SRichard Lowe 		return (unlink(str->s_filename));
422*101e15b5SRichard Lowe 
423*101e15b5SRichard Lowe 	return (0);
424*101e15b5SRichard Lowe }
425*101e15b5SRichard Lowe 
426*101e15b5SRichard Lowe int
stream_stdio_free(stream_t * str)427*101e15b5SRichard Lowe stream_stdio_free(stream_t *str)
428*101e15b5SRichard Lowe {
429*101e15b5SRichard Lowe 	/*
430*101e15b5SRichard Lowe 	 * Unmap the memory we allocated for input, if it's valid to do so.
431*101e15b5SRichard Lowe 	 */
432*101e15b5SRichard Lowe 	if (!(str->s_status & STREAM_OPEN) ||
433*101e15b5SRichard Lowe 	    (str->s_consumer != NULL &&
434*101e15b5SRichard Lowe 	    str->s_consumer->s_status & STREAM_NOT_FREEABLE))
435*101e15b5SRichard Lowe 		return (0);
436*101e15b5SRichard Lowe 
437*101e15b5SRichard Lowe 	if (str->s_buffer != NULL) {
438*101e15b5SRichard Lowe 		if (munmap(str->s_buffer, str->s_buffer_size) < 0)
439*101e15b5SRichard Lowe 			die(EMSG_MUNMAP, "/dev/zero");
440*101e15b5SRichard Lowe 		else {
441*101e15b5SRichard Lowe 			str->s_buffer = NULL;
442*101e15b5SRichard Lowe 			str->s_buffer_size = 0;
443*101e15b5SRichard Lowe 		}
444*101e15b5SRichard Lowe 	}
445*101e15b5SRichard Lowe 
446*101e15b5SRichard Lowe 	stream_unset(str, STREAM_PRIMED | STREAM_INSTANT);
447*101e15b5SRichard Lowe 
448*101e15b5SRichard Lowe 	return (1);
449*101e15b5SRichard Lowe }
450*101e15b5SRichard Lowe 
451*101e15b5SRichard Lowe static int
stream_stdio_eos(stream_t * str)452*101e15b5SRichard Lowe stream_stdio_eos(stream_t *str)
453*101e15b5SRichard Lowe {
454*101e15b5SRichard Lowe 	int retval = 0;
455*101e15b5SRichard Lowe 
456*101e15b5SRichard Lowe 	ASSERT(!(str->s_status & STREAM_OUTPUT));
457*101e15b5SRichard Lowe 	ASSERT(str->s_status & (STREAM_SINGLE | STREAM_WIDE));
458*101e15b5SRichard Lowe 
459*101e15b5SRichard Lowe 	if (str == NULL || str->s_status & STREAM_EOS_REACHED)
460*101e15b5SRichard Lowe 		return (1);
461*101e15b5SRichard Lowe 
462*101e15b5SRichard Lowe 	trip_eof(str->s_type.BF.s_fp);
463*101e15b5SRichard Lowe 	if (feof(str->s_type.BF.s_fp) &&
464*101e15b5SRichard Lowe 	    shelf == SHELF_VACANT &&
465*101e15b5SRichard Lowe 	    str->s_current.l_collate_length != -1) {
466*101e15b5SRichard Lowe 		retval = 1;
467*101e15b5SRichard Lowe 		stream_set(str, STREAM_EOS_REACHED);
468*101e15b5SRichard Lowe 	}
469*101e15b5SRichard Lowe 
470*101e15b5SRichard Lowe 	return (retval);
471*101e15b5SRichard Lowe }
472*101e15b5SRichard Lowe 
473*101e15b5SRichard Lowe /*ARGSUSED*/
474*101e15b5SRichard Lowe static void
stream_stdio_release_line(stream_t * str)475*101e15b5SRichard Lowe stream_stdio_release_line(stream_t *str)
476*101e15b5SRichard Lowe {
477*101e15b5SRichard Lowe }
478*101e15b5SRichard Lowe 
479*101e15b5SRichard Lowe const stream_ops_t stream_stdio_ops = {
480*101e15b5SRichard Lowe 	stream_stdio_is_closable,
481*101e15b5SRichard Lowe 	stream_stdio_close,
482*101e15b5SRichard Lowe 	stream_stdio_eos,
483*101e15b5SRichard Lowe 	stream_stdio_fetch,
484*101e15b5SRichard Lowe 	stream_stdio_flush,
485*101e15b5SRichard Lowe 	stream_stdio_free,
486*101e15b5SRichard Lowe 	stream_stdio_open_for_write,
487*101e15b5SRichard Lowe 	stream_stdio_prime,
488*101e15b5SRichard Lowe 	stream_stdio_put_line,
489*101e15b5SRichard Lowe 	stream_stdio_release_line,
490*101e15b5SRichard Lowe 	stream_stdio_send_eol,
491*101e15b5SRichard Lowe 	stream_stdio_unlink
492*101e15b5SRichard Lowe };
493