1 /* nbdkit
2  * Copyright (C) 2014 Red Hat Inc.
3  *
4  * Redistribution and use in source and binary forms, with or without
5  * modification, are permitted provided that the following conditions are
6  * met:
7  *
8  * * Redistributions of source code must retain the above copyright
9  * notice, this list of conditions and the following disclaimer.
10  *
11  * * Redistributions in binary form must reproduce the above copyright
12  * notice, this list of conditions and the following disclaimer in the
13  * documentation and/or other materials provided with the distribution.
14  *
15  * * Neither the name of Red Hat nor the names of its contributors may be
16  * used to endorse or promote products derived from this software without
17  * specific prior written permission.
18  *
19  * THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS IS'' AND
20  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
21  * THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
22  * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
23  * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
24  * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
25  * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
26  * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
27  * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
29  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
30  * SUCH DAMAGE.
31  */
32 
33 #include <config.h>
34 
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <string.h>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <errno.h>
43 
44 #include <nbdkit-plugin.h>
45 
46 static char *filename = NULL;
47 static int fd = -1;
48 
49 /* In theory INT64_MAX, but it breaks qemu's NBD driver. */
50 static int64_t size = INT64_MAX/2;
51 
52 /* Flag if we have entered the unrecoverable error state because of
53  * a seek backwards.
54  */
55 static int errorstate = 0;
56 
57 /* Highest byte (+1) that has been written in the data stream. */
58 static uint64_t highestwrite = 0;
59 
60 /* Called for each key=value passed on the command line. */
61 static int
streaming_config(const char * key,const char * value)62 streaming_config (const char *key, const char *value)
63 {
64   if (strcmp (key, "pipe") == 0) {
65     /* See FILENAMES AND PATHS in nbdkit-plugin(3). */
66     filename = nbdkit_absolute_path (value);
67     if (!filename)
68       return -1;
69   }
70   else if (strcmp (key, "size") == 0) {
71     size = nbdkit_parse_size (value);
72     if (size == -1)
73       return -1;
74   }
75   else {
76     nbdkit_error ("unknown parameter '%s'", key);
77     return -1;
78   }
79 
80   return 0;
81 }
82 
83 /* Check the user did pass a pipe=<FILENAME> parameter. */
84 static int
streaming_config_complete(void)85 streaming_config_complete (void)
86 {
87   if (filename == NULL) {
88     nbdkit_error ("you must supply the pipe=<FILENAME> parameter "
89                   "after the plugin name on the command line");
90     return -1;
91   }
92 
93   return 0;
94 }
95 
96 static int
streaming_get_ready(void)97 streaming_get_ready (void)
98 {
99   /* Open the file blindly.  If this fails with ENOENT then we create a
100    * FIFO and try again.
101    */
102  again:
103   fd = open (filename, O_RDWR|O_CLOEXEC|O_NOCTTY);
104   if (fd == -1) {
105     if (errno != ENOENT) {
106       nbdkit_error ("open: %s: %m", filename);
107       return -1;
108     }
109     if (mknod (filename, S_IFIFO | 0666, 0) == -1) {
110       nbdkit_error ("mknod: %s: %m", filename);
111       return -1;
112     }
113     goto again;
114   }
115 
116   return 0;
117 }
118 
119 /* nbdkit is shutting down. */
120 static void
streaming_unload(void)121 streaming_unload (void)
122 {
123   if (fd >= 0)
124     close (fd);
125   free (filename);
126 }
127 
128 #define streaming_config_help \
129   "pipe=<FILENAME>     (required) The filename to serve.\n" \
130   "size=<SIZE>         (optional) Stream size."
131 
132 /* Create the per-connection handle. */
133 static void *
streaming_open(int readonly)134 streaming_open (int readonly)
135 {
136   void *h;
137 
138   if (readonly) {
139     nbdkit_error ("you cannot use the -r option with the streaming plugin");
140     return NULL;
141   }
142 
143   if (errorstate) {
144     nbdkit_error ("unrecoverable error state, "
145                   "no new connections can be opened");
146     return NULL;
147   }
148 
149   /* There is no handle, so return an arbitrary non-NULL pointer. */
150   h = &fd;
151 
152   return h;
153 }
154 
155 /* Free up the per-connection handle. */
156 static void
streaming_close(void * handle)157 streaming_close (void *handle)
158 {
159 }
160 
161 #define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_ALL_REQUESTS
162 
163 /* Return the size of the stream (infinite). */
164 static int64_t
streaming_get_size(void * handle)165 streaming_get_size (void *handle)
166 {
167   return size;
168 }
169 
170 /* Write data to the stream. */
171 static int
streaming_pwrite(void * handle,const void * buf,uint32_t count,uint64_t offset)172 streaming_pwrite (void *handle, const void *buf,
173                   uint32_t count, uint64_t offset)
174 {
175   size_t n;
176   ssize_t r;
177 
178   if (errorstate) {
179     nbdkit_error ("unrecoverable error state");
180     errno = EIO;
181     return -1;
182   }
183 
184   if (offset < highestwrite) {
185     nbdkit_error ("client tried to seek backwards and write: "
186                   "the streaming plugin does not currently support this");
187     errorstate = 1;
188     errno = EIO;
189     return -1;
190   }
191 
192   /* Need to write some zeroes. */
193   if (offset > highestwrite) {
194     int64_t remaining = offset - highestwrite;
195     static char zerobuf[4096];
196 
197     while (remaining > 0) {
198       n = remaining > sizeof zerobuf ? sizeof zerobuf : remaining;
199       r = write (fd, zerobuf, n);
200       if (r == -1) {
201         nbdkit_error ("write: %m");
202         errorstate = 1;
203         return -1;
204       }
205       highestwrite += r;
206       remaining -= r;
207     }
208   }
209 
210   /* Write the data. */
211   while (count > 0) {
212     r = write (fd, buf, count);
213     if (r == -1) {
214       nbdkit_error ("write: %m");
215       errorstate = 1;
216       return -1;
217     }
218     buf += r;
219     highestwrite += r;
220     count -= r;
221   }
222 
223   return 0;
224 }
225 
226 /* Read data back from the stream. */
227 static int
streaming_pread(void * handle,void * buf,uint32_t count,uint64_t offset)228 streaming_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
229 {
230   if (errorstate) {
231     nbdkit_error ("unrecoverable error state");
232     errno = EIO;
233     return -1;
234   }
235 
236   /* Allow reads which are entirely >= highestwrite.  These return zeroes. */
237   if (offset >= highestwrite) {
238     memset (buf, 0, count);
239     return 0;
240   }
241 
242   nbdkit_error ("client tried to read: "
243                 "the streaming plugin does not currently support this");
244   errorstate = 1;
245   errno = EIO;
246   return -1;
247 }
248 
249 static struct nbdkit_plugin plugin = {
250   .name              = "streaming",
251   .longname          = "nbdkit streaming plugin",
252   .version           = PACKAGE_VERSION,
253   .unload            = streaming_unload,
254   .config            = streaming_config,
255   .config_complete   = streaming_config_complete,
256   .config_help       = streaming_config_help,
257   .get_ready         = streaming_get_ready,
258   .open              = streaming_open,
259   .close             = streaming_close,
260   .get_size          = streaming_get_size,
261   .pwrite            = streaming_pwrite,
262   .pread             = streaming_pread,
263   .errno_is_preserved = 1,
264 };
265 
266 NBDKIT_REGISTER_PLUGIN(plugin)
267