1 /*
2  *
3  * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4  * All rights reserved.
5  *
6  * This file is part of libwandio.
7  *
8  * This code has been developed by the University of Waikato WAND
9  * research group. For further information please see http://www.wand.net.nz/
10  *
11  * libwandio is free software; you can redistribute it and/or modify
12  * it under the terms of the GNU Lesser General Public License as published by
13  * the Free Software Foundation; either version 3 of the License, or
14  * (at your option) any later version.
15  *
16  * libwandio is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU Lesser General Public License for more details.
20  *
21  * You should have received a copy of the GNU Lesser General Public License
22  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
23  *
24  *
25  */
26 
27 #include "config.h"
28 #include "wandio.h"
29 #include "wandio_internal.h"
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <fcntl.h>
33 #include <stdlib.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <stdbool.h>
37 #ifdef HAVE_SYS_PRCTL_H
38 #include <sys/prctl.h>
39 #endif
40 
41 /* Libwandio IO module implementing a threaded writer.
42  *
43  * This module enables another IO writer, called the "child", to perform its
44  * writing using a separate thread. The main thread writes data into a series
45  * of 1MB buffers. Meanwhile, the writing thread writes out of these buffers
46  * using the callback for the child reader. pthread conditions are used to
47  * communicate between the two threads, e.g. when there are buffers available
48  * for the main thread to copy data into or when there is data available for
49  * the write thread to write.
50  */
51 
52 /* 1MB Buffer */
53 #define BUFFERSIZE (1024*1024)
54 #define BUFFERS 5
55 
56 extern iow_source_t thread_wsource;
57 
58 /* This structure defines a single buffer or "slice" */
59 struct buffer_t {
60 	char buffer[BUFFERSIZE];	/* The buffer itself */
61 	int len;			/* The size of the buffer */
62 	enum { EMPTY = 0, FULL = 1 } state;	/* Is the buffer in use? */
63         bool flush;
64 };
65 
66 struct state_t {
67 	/* The collection of buffers (or slices) */
68 	struct buffer_t buffer[BUFFERS];
69 	/* The write offset into the current buffer */
70 	int64_t offset;
71 	/* The writing thread */
72 	pthread_t consumer;
73 	/* The child writer */
74 	iow_t *iow;
75 	/* Indicates that there is data in one of the buffers */
76 	pthread_cond_t data_ready;
77 	/* Indicates that there is a free buffer to write into */
78 	pthread_cond_t space_avail;
79 	/* The mutex for the write buffers */
80 	pthread_mutex_t mutex;
81 	/* The index of the buffer to write into next */
82 	int out_buffer;
83 	/* Indicates whether the main thread is concluding */
84 	bool closing;
85 };
86 
87 #define DATA(x) ((struct state_t *)((x)->data))
88 #define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
89 #define min(a,b) ((a)<(b) ? (a) : (b))
90 
91 /* The writing thread */
thread_consumer(void * userdata)92 static void *thread_consumer(void *userdata)
93 {
94 	int buffer=0;
95 	bool running = true;
96 	iow_t *state = (iow_t *) userdata;
97 
98 #ifdef PR_SET_NAME
99 	char namebuf[17];
100 	if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
101 		namebuf[16] = '\0'; /* Make sure it's NUL terminated */
102 		/* If the filename is too long, overwrite the last few bytes */
103 		if (strlen(namebuf)>9) {
104 			strcpy(namebuf+10,"[iow]");
105 		}
106 		else {
107 			strncat(namebuf," [iow]",16);
108 		}
109 		prctl(PR_SET_NAME, namebuf, 0,0,0);
110 	}
111 #endif
112 
113 	pthread_mutex_lock(&DATA(state)->mutex);
114 	do {
115 		/* Wait for data that we can write */
116 		while (DATA(state)->buffer[buffer].state == EMPTY) {
117 			/* Unless, of course, the program is over! */
118 			if (DATA(state)->closing)
119 				break;
120 			pthread_cond_wait(&DATA(state)->data_ready,
121 					&DATA(state)->mutex);
122 		}
123 		/* Empty the buffer using the child writer */
124 		pthread_mutex_unlock(&DATA(state)->mutex);
125 		wandio_wwrite(
126 				DATA(state)->iow,
127 				DATA(state)->buffer[buffer].buffer,
128 				DATA(state)->buffer[buffer].len);
129                 if (DATA(state)->buffer[buffer].flush) {
130                         wandio_wflush(DATA(state)->iow);
131                 }
132 		pthread_mutex_lock(&DATA(state)->mutex);
133 
134 		/* If we've not reached the end of the file keep going */
135 		running = ( DATA(state)->buffer[buffer].len > 0 );
136 		DATA(state)->buffer[buffer].len = 0;
137 		DATA(state)->buffer[buffer].state = EMPTY;
138                 DATA(state)->buffer[buffer].flush = false;
139 
140 		/* Signal that we've freed up another buffer for the main
141 		 * thread to copy data into */
142 		pthread_cond_signal(&DATA(state)->space_avail);
143 
144 
145 		/* Move on to the next buffer */
146 		buffer=(buffer+1) % BUFFERS;
147 
148 	} while(running);
149 
150 	/* If we reach here, it's all over so start tidying up */
151 	wandio_wdestroy(DATA(state)->iow);
152 
153 	pthread_mutex_unlock(&DATA(state)->mutex);
154 	return NULL;
155 }
156 
thread_wopen(iow_t * child)157 iow_t *thread_wopen(iow_t *child)
158 {
159 	iow_t *state;
160 
161 	if (!child) {
162 		return NULL;
163 	}
164 
165 
166 	state = malloc(sizeof(iow_t));
167 	state->data = calloc(1,sizeof(struct state_t));
168 	state->source = &thread_wsource;
169 
170 	DATA(state)->out_buffer = 0;
171 	DATA(state)->offset = 0;
172 	pthread_mutex_init(&DATA(state)->mutex,NULL);
173 	pthread_cond_init(&DATA(state)->data_ready,NULL);
174 	pthread_cond_init(&DATA(state)->space_avail,NULL);
175 
176 	DATA(state)->iow = child;
177 	DATA(state)->closing = false;
178 
179 	/* Start the writer thread */
180 	pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
181 
182 	return state;
183 }
184 
thread_wwrite(iow_t * state,const char * buffer,int64_t len)185 static int64_t thread_wwrite(iow_t *state, const char *buffer, int64_t len)
186 {
187 	int slice;
188 	int copied=0;
189 	int newbuffer;
190 
191 	pthread_mutex_lock(&DATA(state)->mutex);
192 	while(len>0) {
193 
194 		/* Wait for there to be space available for us to write into */
195 		while (OUTBUFFER(state).state == FULL) {
196 			write_waits++;
197 			pthread_cond_wait(&DATA(state)->space_avail,
198 					&DATA(state)->mutex);
199 		}
200 
201 		/* Copy out of our main buffer into the next available slice */
202 		slice=min(
203 			(int64_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
204 			len);
205 
206 		pthread_mutex_unlock(&DATA(state)->mutex);
207 		memcpy(
208 			OUTBUFFER(state).buffer+DATA(state)->offset,
209 			buffer,
210 			slice
211 			);
212 		pthread_mutex_lock(&DATA(state)->mutex);
213 
214 		DATA(state)->offset += slice;
215 		OUTBUFFER(state).len += slice;
216 
217 		buffer += slice;
218 		len -= slice;
219 		copied += slice;
220 		newbuffer = DATA(state)->out_buffer;
221 
222 		/* If we've filled a buffer, move on to the next one and
223 		 * signal to the write thread that there is something for it
224 		 * to do */
225 		if (DATA(state)->offset >= (int64_t)sizeof(OUTBUFFER(state).buffer)) {
226 			OUTBUFFER(state).state = FULL;
227                         OUTBUFFER(state).flush = false;
228 			pthread_cond_signal(&DATA(state)->data_ready);
229 			DATA(state)->offset = 0;
230 			newbuffer = (newbuffer+1) % BUFFERS;
231 		}
232 
233 		DATA(state)->out_buffer = newbuffer;
234 	}
235 
236 	pthread_mutex_unlock(&DATA(state)->mutex);
237 	return copied;
238 }
239 
thread_wflush(iow_t * iow)240 static int thread_wflush(iow_t *iow) {
241         int64_t flushed = 0;
242 	pthread_mutex_lock(&DATA(iow)->mutex);
243 	if (DATA(iow)->offset > 0) {
244                 flushed = DATA(iow)->offset;
245                 OUTBUFFER(iow).state = FULL;
246                 OUTBUFFER(iow).flush = true;
247                 pthread_cond_signal(&DATA(iow)->data_ready);
248                 DATA(iow)->offset = 0;
249                 DATA(iow)->out_buffer = (DATA(iow)->out_buffer+1) % BUFFERS;        }
250 
251 	pthread_mutex_unlock(&DATA(iow)->mutex);
252         return (int)flushed;
253 }
254 
thread_wclose(iow_t * iow)255 static void thread_wclose(iow_t *iow)
256 {
257 	pthread_mutex_lock(&DATA(iow)->mutex);
258 	DATA(iow)->closing = true;
259 	pthread_cond_signal(&DATA(iow)->data_ready);
260 	pthread_mutex_unlock(&DATA(iow)->mutex);
261 	pthread_join(DATA(iow)->consumer,NULL);
262 
263 	pthread_mutex_destroy(&DATA(iow)->mutex);
264 	pthread_cond_destroy(&DATA(iow)->data_ready);
265 	pthread_cond_destroy(&DATA(iow)->space_avail);
266 
267 	free(iow->data);
268 	free(iow);
269 }
270 
271 iow_source_t thread_wsource = {
272 	"threadw",
273 	thread_wwrite,
274         thread_wflush,
275 	thread_wclose
276 };
277