1 /* @(#)ringbuff.c	1.24 10/12/19 Copyright 1998,1999,2000 Heiko Eissfeldt, Copyright 2004-2010 J. Schilling */
2 #include "config.h"
3 #ifndef lint
4 static	UConst char sccsid[] =
5 "@(#)ringbuff.c	1.24 10/12/19 Copyright 1998,1999,2000 Heiko Eissfeldt, Copyright 2004-2010 J. Schilling";
6 #endif
7 /*
8  * Ringbuffer handling
9  */
10 /*
11  * The contents of this file are subject to the terms of the
12  * Common Development and Distribution License, Version 1.0 only
13  * (the "License").  You may not use this file except in compliance
14  * with the License.
15  *
16  * See the file CDDL.Schily.txt in this distribution for details.
17  * A copy of the CDDL is also available via the Internet at
18  * http://www.opensource.org/licenses/cddl1.txt
19  *
20  * When distributing Covered Code, include this CDDL HEADER in each
21  * file and include the License file CDDL.Schily.txt from this distribution.
22  */
23 
24 #include "config.h"
25 
26 #include <schily/stdlib.h>
27 #include <schily/stdio.h>
28 #include <schily/standard.h>
29 #include <schily/unistd.h>
30 #include <schily/schily.h>
31 #include <schily/nlsdefs.h>
32 
33 #if defined(HAVE_SEMGET) && defined(USE_SEMAPHORES)
34 #include <schily/ipc.h>
35 #include <schily/sem.h>
36 #endif
37 
38 #include <scg/scsitransp.h>
39 
40 #include "mytype.h"
41 #include "global.h"
42 #include "interface.h"
43 #include "ringbuff.h"
44 #include "semshm.h"
45 #include "exitcodes.h"
46 
47 #undef WARN_INTERRUPT
48 #undef _DEBUG
49 #include <schily/assert.h>
50 
51 static void occupy_buffer	__PR((void));
52 
53 myringbuff		**he_fill_buffer;
54 myringbuff		**last_buffer;
55 volatile unsigned long	*total_segments_read;
56 volatile unsigned long	*total_segments_written;
57 volatile int		*child_waits;
58 volatile int		*parent_waits;
59 volatile int		*in_lendian;
60 volatile int		*eorecording;
61 
62 static myringbuff	*previous_read_buffer;
63 static unsigned int	total_buffers;
64 
65 #define	SEMS	2
66 
67 #define	defined_buffers()	((*total_segments_read) - \
68 				(*total_segments_written))
69 #define	free_buffers()		(total_buffers - defined_buffers())
70 #define	occupied_buffers()	(defined_buffers())
71 
72 /* ARGSUSED */
73 void
set_total_buffers(num_buffers,mysem_id)74 set_total_buffers(num_buffers, mysem_id)
75 	unsigned int	num_buffers;
76 	int		mysem_id;
77 {
78 #if defined(HAVE_SEMGET) && defined(USE_SEMAPHORES)
79 	union my_semun	mysemun;
80 
81 	mysemun.val   = 0;
82 	if (semctl(mysem_id, (int) DEF_SEM, SETVAL, mysemun) < 0) {
83 		errmsg(_("Error in semctl DEF_SEM.\n"));
84 	}
85 
86 	mysemun.val   = num_buffers;
87 	if (semctl(mysem_id, (int) FREE_SEM, SETVAL, mysemun) < 0) {
88 		errmsg(_("Error in semctl FREE_SEM.\n"));
89 	}
90 #endif
91 
92 	total_buffers = num_buffers;
93 
94 	/*
95 	 * initialize pointers
96 	 */
97 	*he_fill_buffer = *last_buffer = previous_read_buffer = NULL;
98 #ifdef DEBUG_SHM
99 	fprintf(stderr,
100 		"init: fill_b = %p,  last_b = %p\n",
101 		*he_fill_buffer, *last_buffer);
102 #endif
103 }
104 
105 const myringbuff *
get_previous_read_buffer()106 get_previous_read_buffer()
107 {
108 	assert(previous_read_buffer != NULL);
109 	assert(previous_read_buffer != *he_fill_buffer);
110 	return (previous_read_buffer);
111 }
112 
113 const myringbuff *
get_he_fill_buffer()114 get_he_fill_buffer()
115 {
116 	assert(*he_fill_buffer != NULL);
117 	assert(previous_read_buffer != *he_fill_buffer);
118 	return (*he_fill_buffer);
119 }
120 
121 void
define_buffer()122 define_buffer()
123 {
124 	assert(defined_buffers() < total_buffers);
125 
126 #ifdef _DEBUG
127 #if 0
128 	fprintf(stderr, "stop  reading  %p - %p\n",
129 		*he_fill_buffer, (char *)(*he_fill_buffer) + ENTRY_SIZE -1);
130 #endif
131 #endif
132 
133 	if (*last_buffer == NULL)
134 		*last_buffer = *he_fill_buffer;
135 #ifdef DEBUG_SHM
136 	fprintf(stderr,
137 		"define: fill_b = %p,  last_b = %p\n",
138 		*he_fill_buffer, *last_buffer);
139 #endif
140 
141 	(*total_segments_read)++;
142 	semrelease(sem_id, DEF_SEM, 1);
143 }
144 
145 void
drop_buffer()146 drop_buffer()
147 {
148 	assert(free_buffers() < total_buffers);
149 	assert(occupied_buffers() > 0);
150 
151 #ifdef _DEBUG
152 #if 0
153 	fprintf(stderr, " stop  writing %p - %p ",
154 		*last_buffer, (char *)(*last_buffer) + ENTRY_SIZE -1);
155 #endif
156 #endif
157 
158 	if (*last_buffer == NULL)
159 		*last_buffer = *he_fill_buffer;
160 	else
161 		*last_buffer = INC(*last_buffer);
162 #ifdef DEBUG_SHM
163 	fprintf(stderr,
164 		"drop: fill_b = %p,  last_b = %p\n",
165 		*he_fill_buffer, *last_buffer);
166 #endif
167 	(*total_segments_written)++;
168 	semrelease(sem_id, FREE_SEM, 1);
169 }
170 
171 void
drop_all_buffers()172 drop_all_buffers()
173 {
174 	(*total_segments_written) = (*total_segments_read);
175 	semrelease(sem_id, FREE_SEM, total_buffers);
176 }
177 
178 static void
occupy_buffer()179 occupy_buffer()
180 {
181 	assert(occupied_buffers() <= total_buffers);
182 
183 	previous_read_buffer = *he_fill_buffer;
184 
185 	if (*he_fill_buffer == NULL) {
186 		*he_fill_buffer = RB_BASE;
187 	} else {
188 		*he_fill_buffer = INC(*he_fill_buffer);
189 	}
190 }
191 
192 #if defined HAVE_FORK_AND_SHAREDMEM
193 myringbuff *
get_next_buffer()194 get_next_buffer()
195 {
196 	if (!global.have_forked) {
197 		occupy_buffer();
198 		return (*he_fill_buffer);
199 	}
200 #ifdef WARN_INTERRUPT
201 	if (free_buffers() <= 0) {
202 		fprintf(stderr,
203 		_("READER waits!! r=%lu, w=%lu\n"), *total_segments_read,
204 			*total_segments_written);
205 	}
206 #endif
207 
208 	/*
209 	 * wait for a new buffer to become available
210 	 */
211 	if (semrequest(sem_id, FREE_SEM) != 0) {
212 		/*
213 		 * semaphore operation failed.
214 		 * try again...
215 		 */
216 		errmsgno(EX_BAD, _("Child reader sem request failed.\n"));
217 		exit(SEMAPHORE_ERROR);
218 	}
219 #if 0
220 	fprintf(stderr, "start reading  %p - %p\n",
221 		*he_fill_buffer, (char *)(*fill_buffer) + ENTRY_SIZE -1);
222 #endif
223 
224 	occupy_buffer();
225 
226 #ifdef DEBUG_SHM
227 	fprintf(stderr,
228 		"next: fill_b = %p,  last_b = %p, @last = %p\n",
229 		*he_fill_buffer, *last_buffer, last_buffer);
230 #endif
231 	return (*he_fill_buffer);
232 }
233 
234 myringbuff *
get_oldest_buffer()235 get_oldest_buffer()
236 {
237 	myringbuff	*retval;
238 
239 	if (!global.have_forked)
240 		return (*he_fill_buffer);
241 
242 #ifdef WARN_INTERRUPT
243 	if (free_buffers() == total_buffers) {
244 		fprintf(stderr,
245 		_("WRITER waits!! r=%lu, w=%lu\n"), *total_segments_read,
246 			*total_segments_written);
247 	}
248 #endif
249 	/*
250 	 * wait for buffer to be defined
251 	 */
252 	seterrno(0);
253 	if (semrequest(sem_id, DEF_SEM) != 0) {
254 		if (geterrno() == 0)
255 			return ((myringbuff *)NULL);
256 		/*
257 		 * semaphore operation failed.
258 		 */
259 		errmsg(_("Parent writer sem request failed.\n"));
260 		return ((myringbuff *)NULL);
261 	}
262 
263 	retval = *last_buffer;
264 
265 #if 0
266 	fprintf(stderr, " begin writing %p - %p\n",
267 			retval, (char *)retval + ENTRY_SIZE -1);
268 #endif
269 
270 	return (retval);
271 }
272 #else /* HAVE_FORK_AND_SHAREDMEM */
273 myringbuff *
get_next_buffer()274 get_next_buffer()
275 {
276 	occupy_buffer();
277 	return (*he_fill_buffer);
278 }
279 
280 myringbuff *
get_oldest_buffer()281 get_oldest_buffer()
282 {
283 	return (*he_fill_buffer);
284 }
285 #endif /* HAVE_FORK_AND_SHAREDMEM */
286