1 /*
2 *
3 * Copyright (c) 2007-2016 The University of Waikato, Hamilton, New Zealand.
4 * All rights reserved.
5 *
6 * This file is part of libwandio.
7 *
8 * This code has been developed by the University of Waikato WAND
9 * research group. For further information please see http://www.wand.net.nz/
10 *
11 * libwandio is free software; you can redistribute it and/or modify
12 * it under the terms of the GNU Lesser General Public License as published by
13 * the Free Software Foundation; either version 3 of the License, or
14 * (at your option) any later version.
15 *
16 * libwandio is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU Lesser General Public License for more details.
20 *
21 * You should have received a copy of the GNU Lesser General Public License
22 * along with this program. If not, see <http://www.gnu.org/licenses/>.
23 *
24 *
25 */
26
27 #include "config.h"
28 #include "wandio.h"
29 #include "wandio_internal.h"
30 #include <sys/types.h>
31 #include <sys/stat.h>
32 #include <fcntl.h>
33 #include <stdlib.h>
34 #include <pthread.h>
35 #include <string.h>
36 #include <stdbool.h>
37 #ifdef HAVE_SYS_PRCTL_H
38 #include <sys/prctl.h>
39 #endif
40
41 /* Libwandio IO module implementing a threaded writer.
42 *
43 * This module enables another IO writer, called the "child", to perform its
44 * writing using a separate thread. The main thread writes data into a series
45 * of 1MB buffers. Meanwhile, the writing thread writes out of these buffers
46 * using the callback for the child reader. pthread conditions are used to
47 * communicate between the two threads, e.g. when there are buffers available
48 * for the main thread to copy data into or when there is data available for
49 * the write thread to write.
50 */
51
52 /* 1MB Buffer */
53 #define BUFFERSIZE (1024*1024)
54 #define BUFFERS 5
55
56 extern iow_source_t thread_wsource;
57
58 /* This structure defines a single buffer or "slice" */
59 struct buffer_t {
60 char buffer[BUFFERSIZE]; /* The buffer itself */
61 int len; /* The size of the buffer */
62 enum { EMPTY = 0, FULL = 1 } state; /* Is the buffer in use? */
63 bool flush;
64 };
65
66 struct state_t {
67 /* The collection of buffers (or slices) */
68 struct buffer_t buffer[BUFFERS];
69 /* The write offset into the current buffer */
70 int64_t offset;
71 /* The writing thread */
72 pthread_t consumer;
73 /* The child writer */
74 iow_t *iow;
75 /* Indicates that there is data in one of the buffers */
76 pthread_cond_t data_ready;
77 /* Indicates that there is a free buffer to write into */
78 pthread_cond_t space_avail;
79 /* The mutex for the write buffers */
80 pthread_mutex_t mutex;
81 /* The index of the buffer to write into next */
82 int out_buffer;
83 /* Indicates whether the main thread is concluding */
84 bool closing;
85 };
86
87 #define DATA(x) ((struct state_t *)((x)->data))
88 #define OUTBUFFER(x) (DATA(x)->buffer[DATA(x)->out_buffer])
89 #define min(a,b) ((a)<(b) ? (a) : (b))
90
91 /* The writing thread */
thread_consumer(void * userdata)92 static void *thread_consumer(void *userdata)
93 {
94 int buffer=0;
95 bool running = true;
96 iow_t *state = (iow_t *) userdata;
97
98 #ifdef PR_SET_NAME
99 char namebuf[17];
100 if (prctl(PR_GET_NAME, namebuf, 0,0,0) == 0) {
101 namebuf[16] = '\0'; /* Make sure it's NUL terminated */
102 /* If the filename is too long, overwrite the last few bytes */
103 if (strlen(namebuf)>9) {
104 strcpy(namebuf+10,"[iow]");
105 }
106 else {
107 strncat(namebuf," [iow]",16);
108 }
109 prctl(PR_SET_NAME, namebuf, 0,0,0);
110 }
111 #endif
112
113 pthread_mutex_lock(&DATA(state)->mutex);
114 do {
115 /* Wait for data that we can write */
116 while (DATA(state)->buffer[buffer].state == EMPTY) {
117 /* Unless, of course, the program is over! */
118 if (DATA(state)->closing)
119 break;
120 pthread_cond_wait(&DATA(state)->data_ready,
121 &DATA(state)->mutex);
122 }
123 /* Empty the buffer using the child writer */
124 pthread_mutex_unlock(&DATA(state)->mutex);
125 wandio_wwrite(
126 DATA(state)->iow,
127 DATA(state)->buffer[buffer].buffer,
128 DATA(state)->buffer[buffer].len);
129 if (DATA(state)->buffer[buffer].flush) {
130 wandio_wflush(DATA(state)->iow);
131 }
132 pthread_mutex_lock(&DATA(state)->mutex);
133
134 /* If we've not reached the end of the file keep going */
135 running = ( DATA(state)->buffer[buffer].len > 0 );
136 DATA(state)->buffer[buffer].len = 0;
137 DATA(state)->buffer[buffer].state = EMPTY;
138 DATA(state)->buffer[buffer].flush = false;
139
140 /* Signal that we've freed up another buffer for the main
141 * thread to copy data into */
142 pthread_cond_signal(&DATA(state)->space_avail);
143
144
145 /* Move on to the next buffer */
146 buffer=(buffer+1) % BUFFERS;
147
148 } while(running);
149
150 /* If we reach here, it's all over so start tidying up */
151 wandio_wdestroy(DATA(state)->iow);
152
153 pthread_mutex_unlock(&DATA(state)->mutex);
154 return NULL;
155 }
156
thread_wopen(iow_t * child)157 iow_t *thread_wopen(iow_t *child)
158 {
159 iow_t *state;
160
161 if (!child) {
162 return NULL;
163 }
164
165
166 state = malloc(sizeof(iow_t));
167 state->data = calloc(1,sizeof(struct state_t));
168 state->source = &thread_wsource;
169
170 DATA(state)->out_buffer = 0;
171 DATA(state)->offset = 0;
172 pthread_mutex_init(&DATA(state)->mutex,NULL);
173 pthread_cond_init(&DATA(state)->data_ready,NULL);
174 pthread_cond_init(&DATA(state)->space_avail,NULL);
175
176 DATA(state)->iow = child;
177 DATA(state)->closing = false;
178
179 /* Start the writer thread */
180 pthread_create(&DATA(state)->consumer,NULL,thread_consumer,state);
181
182 return state;
183 }
184
thread_wwrite(iow_t * state,const char * buffer,int64_t len)185 static int64_t thread_wwrite(iow_t *state, const char *buffer, int64_t len)
186 {
187 int slice;
188 int copied=0;
189 int newbuffer;
190
191 pthread_mutex_lock(&DATA(state)->mutex);
192 while(len>0) {
193
194 /* Wait for there to be space available for us to write into */
195 while (OUTBUFFER(state).state == FULL) {
196 write_waits++;
197 pthread_cond_wait(&DATA(state)->space_avail,
198 &DATA(state)->mutex);
199 }
200
201 /* Copy out of our main buffer into the next available slice */
202 slice=min(
203 (int64_t)sizeof(OUTBUFFER(state).buffer)-DATA(state)->offset,
204 len);
205
206 pthread_mutex_unlock(&DATA(state)->mutex);
207 memcpy(
208 OUTBUFFER(state).buffer+DATA(state)->offset,
209 buffer,
210 slice
211 );
212 pthread_mutex_lock(&DATA(state)->mutex);
213
214 DATA(state)->offset += slice;
215 OUTBUFFER(state).len += slice;
216
217 buffer += slice;
218 len -= slice;
219 copied += slice;
220 newbuffer = DATA(state)->out_buffer;
221
222 /* If we've filled a buffer, move on to the next one and
223 * signal to the write thread that there is something for it
224 * to do */
225 if (DATA(state)->offset >= (int64_t)sizeof(OUTBUFFER(state).buffer)) {
226 OUTBUFFER(state).state = FULL;
227 OUTBUFFER(state).flush = false;
228 pthread_cond_signal(&DATA(state)->data_ready);
229 DATA(state)->offset = 0;
230 newbuffer = (newbuffer+1) % BUFFERS;
231 }
232
233 DATA(state)->out_buffer = newbuffer;
234 }
235
236 pthread_mutex_unlock(&DATA(state)->mutex);
237 return copied;
238 }
239
thread_wflush(iow_t * iow)240 static int thread_wflush(iow_t *iow) {
241 int64_t flushed = 0;
242 pthread_mutex_lock(&DATA(iow)->mutex);
243 if (DATA(iow)->offset > 0) {
244 flushed = DATA(iow)->offset;
245 OUTBUFFER(iow).state = FULL;
246 OUTBUFFER(iow).flush = true;
247 pthread_cond_signal(&DATA(iow)->data_ready);
248 DATA(iow)->offset = 0;
249 DATA(iow)->out_buffer = (DATA(iow)->out_buffer+1) % BUFFERS; }
250
251 pthread_mutex_unlock(&DATA(iow)->mutex);
252 return (int)flushed;
253 }
254
thread_wclose(iow_t * iow)255 static void thread_wclose(iow_t *iow)
256 {
257 pthread_mutex_lock(&DATA(iow)->mutex);
258 DATA(iow)->closing = true;
259 pthread_cond_signal(&DATA(iow)->data_ready);
260 pthread_mutex_unlock(&DATA(iow)->mutex);
261 pthread_join(DATA(iow)->consumer,NULL);
262
263 pthread_mutex_destroy(&DATA(iow)->mutex);
264 pthread_cond_destroy(&DATA(iow)->data_ready);
265 pthread_cond_destroy(&DATA(iow)->space_avail);
266
267 free(iow->data);
268 free(iow);
269 }
270
271 iow_source_t thread_wsource = {
272 "threadw",
273 thread_wwrite,
274 thread_wflush,
275 thread_wclose
276 };
277