1 /* nbdkit
2 * Copyright (C) 2014 Red Hat Inc.
3 *
4 * Redistribution and use in source and binary forms, with or without
5 * modification, are permitted provided that the following conditions are
6 * met:
7 *
8 * * Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 *
11 * * Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * * Neither the name of Red Hat nor the names of its contributors may be
16 * used to endorse or promote products derived from this software without
17 * specific prior written permission.
18 *
19 * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21 * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22 * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30 * SUCH DAMAGE.
31 */
32
33 #include <config.h>
34
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <errno.h>
43
44 #include <nbdkit-plugin.h>
45
46 static char *filename = NULL;
47 static int fd = -1;
48
49 /* In theory INT64_MAX, but it breaks qemu's NBD driver. */
50 static int64_t size = INT64_MAX/2;
51
52 /* Flag if we have entered the unrecoverable error state because of
53 * a seek backwards.
54 */
55 static int errorstate = 0;
56
57 /* Highest byte (+1) that has been written in the data stream. */
58 static uint64_t highestwrite = 0;
59
60 /* Called for each key=value passed on the command line. */
61 static int
streaming_config(const char * key,const char * value)62 streaming_config (const char *key, const char *value)
63 {
64 if (strcmp (key, "pipe") == 0) {
65 /* See FILENAMES AND PATHS in nbdkit-plugin(3). */
66 filename = nbdkit_absolute_path (value);
67 if (!filename)
68 return -1;
69 }
70 else if (strcmp (key, "size") == 0) {
71 size = nbdkit_parse_size (value);
72 if (size == -1)
73 return -1;
74 }
75 else {
76 nbdkit_error ("unknown parameter '%s'", key);
77 return -1;
78 }
79
80 return 0;
81 }
82
83 /* Check the user did pass a pipe=<FILENAME> parameter. */
84 static int
streaming_config_complete(void)85 streaming_config_complete (void)
86 {
87 if (filename == NULL) {
88 nbdkit_error ("you must supply the pipe=<FILENAME> parameter "
89 "after the plugin name on the command line");
90 return -1;
91 }
92
93 return 0;
94 }
95
96 static int
streaming_get_ready(void)97 streaming_get_ready (void)
98 {
99 /* Open the file blindly. If this fails with ENOENT then we create a
100 * FIFO and try again.
101 */
102 again:
103 fd = open (filename, O_RDWR|O_CLOEXEC|O_NOCTTY);
104 if (fd == -1) {
105 if (errno != ENOENT) {
106 nbdkit_error ("open: %s: %m", filename);
107 return -1;
108 }
109 if (mknod (filename, S_IFIFO | 0666, 0) == -1) {
110 nbdkit_error ("mknod: %s: %m", filename);
111 return -1;
112 }
113 goto again;
114 }
115
116 return 0;
117 }
118
119 /* nbdkit is shutting down. */
120 static void
streaming_unload(void)121 streaming_unload (void)
122 {
123 if (fd >= 0)
124 close (fd);
125 free (filename);
126 }
127
128 #define streaming_config_help \
129 "pipe=<FILENAME> (required) The filename to serve.\n" \
130 "size=<SIZE> (optional) Stream size."
131
132 /* Create the per-connection handle. */
133 static void *
streaming_open(int readonly)134 streaming_open (int readonly)
135 {
136 void *h;
137
138 if (readonly) {
139 nbdkit_error ("you cannot use the -r option with the streaming plugin");
140 return NULL;
141 }
142
143 if (errorstate) {
144 nbdkit_error ("unrecoverable error state, "
145 "no new connections can be opened");
146 return NULL;
147 }
148
149 /* There is no handle, so return an arbitrary non-NULL pointer. */
150 h = &fd;
151
152 return h;
153 }
154
155 /* Free up the per-connection handle. */
156 static void
streaming_close(void * handle)157 streaming_close (void *handle)
158 {
159 }
160
161 #define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS
162
163 /* Return the size of the stream (infinite). */
164 static int64_t
streaming_get_size(void * handle)165 streaming_get_size (void *handle)
166 {
167 return size;
168 }
169
170 /* Write data to the stream. */
171 static int
streaming_pwrite(void * handle,const void * buf,uint32_t count,uint64_t offset)172 streaming_pwrite (void *handle, const void *buf,
173 uint32_t count, uint64_t offset)
174 {
175 size_t n;
176 ssize_t r;
177
178 if (errorstate) {
179 nbdkit_error ("unrecoverable error state");
180 errno = EIO;
181 return -1;
182 }
183
184 if (offset < highestwrite) {
185 nbdkit_error ("client tried to seek backwards and write: "
186 "the streaming plugin does not currently support this");
187 errorstate = 1;
188 errno = EIO;
189 return -1;
190 }
191
192 /* Need to write some zeroes. */
193 if (offset > highestwrite) {
194 int64_t remaining = offset - highestwrite;
195 static char zerobuf[4096];
196
197 while (remaining > 0) {
198 n = remaining > sizeof zerobuf ? sizeof zerobuf : remaining;
199 r = write (fd, zerobuf, n);
200 if (r == -1) {
201 nbdkit_error ("write: %m");
202 errorstate = 1;
203 return -1;
204 }
205 highestwrite += r;
206 remaining -= r;
207 }
208 }
209
210 /* Write the data. */
211 while (count > 0) {
212 r = write (fd, buf, count);
213 if (r == -1) {
214 nbdkit_error ("write: %m");
215 errorstate = 1;
216 return -1;
217 }
218 buf += r;
219 highestwrite += r;
220 count -= r;
221 }
222
223 return 0;
224 }
225
226 /* Read data back from the stream. */
227 static int
streaming_pread(void * handle,void * buf,uint32_t count,uint64_t offset)228 streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
229 {
230 if (errorstate) {
231 nbdkit_error ("unrecoverable error state");
232 errno = EIO;
233 return -1;
234 }
235
236 /* Allow reads which are entirely >= highestwrite. These return zeroes. */
237 if (offset >= highestwrite) {
238 memset (buf, 0, count);
239 return 0;
240 }
241
242 nbdkit_error ("client tried to read: "
243 "the streaming plugin does not currently support this");
244 errorstate = 1;
245 errno = EIO;
246 return -1;
247 }
248
249 static struct nbdkit_plugin plugin = {
250 .name = "streaming",
251 .longname = "nbdkit streaming plugin",
252 .version = PACKAGE_VERSION,
253 .unload = streaming_unload,
254 .config = streaming_config,
255 .config_complete = streaming_config_complete,
256 .config_help = streaming_config_help,
257 .get_ready = streaming_get_ready,
258 .open = streaming_open,
259 .close = streaming_close,
260 .get_size = streaming_get_size,
261 .pwrite = streaming_pwrite,
262 .pread = streaming_pread,
263 .errno_is_preserved = 1,
264 };
265
266 NBDKIT_REGISTER_PLUGIN(plugin)
267