1 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2  *
3  * librsync -- dynamic caching and delta update in HTTP
4  *
5  * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU Lesser General Public License as published by
9  * the Free Software Foundation; either version 2.1 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Lesser General Public License for more details.
16  *
17  * You should have received a copy of the GNU Lesser General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21 
22                               /*=
23                                | Where a calculator on the ENIAC is
24                                | equpped with 18,000 vaccuum tubes and
25                                | weighs 30 tons, computers in the
26                                | future may have only 1,000 vaccuum
27                                | tubes and perhaps weigh 1 1/2
28                                | tons.
29                                |   -- Popular Mechanics, March 1949
30                                */
31 
32 /** \file tube.c
33  * A somewhat elastic but fairly small buffer for data passing through a
34  * stream.
35  *
36  * In most cases the iter can adjust to send just as much data will fit. In
37  * some cases that would be too complicated, because it has to transmit an
38  * integer or something similar. So in that case we stick whatever won't fit
39  * into a small buffer.
40  *
41  * A tube can contain some literal data to go out (typically command bytes),
42  * and also an instruction to copy data from the stream's input or from some
43  * other location. Both literal data and a copy command can be queued at the
44  * same time, but only in that order and at most one of each.
45  *
46  * \todo As an optimization, write it directly to the stream if possible. But
47  * for simplicity don't do that yet.
48  *
49  * \todo I think our current copy code will lock up if the application only
50  * ever calls us with either input or output buffers, and not both. So I guess
51  * in that case we might need to copy into some temporary buffer space, and
52  * then back out again later. */
53 
54 #include "config.h"
55 #include <assert.h>
56 #include <stdlib.h>
57 #include <string.h>
58 #include "librsync.h"
59 #include "job.h"
60 #include "stream.h"
61 #include "trace.h"
62 
rs_tube_catchup_write(rs_job_t * job)63 static void rs_tube_catchup_write(rs_job_t *job)
64 {
65     rs_buffers_t *stream = job->stream;
66     size_t len = job->write_len;
67 
68     assert(len > 0);
69     if (len > stream->avail_out)
70         len = stream->avail_out;
71     if (len) {
72         memcpy(stream->next_out, job->write_buf, len);
73         stream->next_out += len;
74         stream->avail_out -= len;
75         job->write_len -= len;
76         if (job->write_len > 0)
77             /* Still something left in the tube, shuffle it to the front. */
78             memmove(job->write_buf, job->write_buf + len, job->write_len);
79     }
80     rs_trace("wrote " FMT_SIZE " bytes from tube, " FMT_SIZE " left to write",
81              len, job->write_len);
82 }
83 
84 /** Execute a copy command, taking data from the scoop.
85  *
86  * \sa rs_tube_catchup_copy() */
rs_tube_copy_from_scoop(rs_job_t * job)87 static void rs_tube_copy_from_scoop(rs_job_t *job)
88 {
89     rs_buffers_t *stream = job->stream;
90     size_t len = job->copy_len;
91 
92     assert(len > 0);
93     if (len > job->scoop_avail)
94         len = job->scoop_avail;
95     if (len > stream->avail_out)
96         len = stream->avail_out;
97     if (len) {
98         memcpy(stream->next_out, job->scoop_next, len);
99         stream->next_out += len;
100         stream->avail_out -= len;
101         job->scoop_avail -= len;
102         job->scoop_next += len;
103         job->copy_len -= len;
104     }
105     rs_trace("copied " FMT_SIZE " bytes from scoop, " FMT_SIZE
106              " left in scoop, " FMT_SIZE " left to copy", len, job->scoop_avail,
107              job->copy_len);
108 }
109 
110 /** Execute a copy command, taking data from the stream.
111  *
112  * \sa rs_tube_catchup_copy() */
rs_tube_copy_from_stream(rs_job_t * job)113 static void rs_tube_copy_from_stream(rs_job_t *job)
114 {
115     rs_buffers_t *stream = job->stream;
116     size_t len = job->copy_len;
117 
118     assert(len > 0);
119     if (len > stream->avail_in)
120         len = stream->avail_in;
121     if (len > stream->avail_out)
122         len = stream->avail_out;
123     if (len) {
124         memcpy(stream->next_out, stream->next_in, len);
125         stream->next_out += len;
126         stream->avail_out -= len;
127         stream->next_in += len;
128         stream->avail_in -= len;
129         job->copy_len -= len;
130     }
131     rs_trace("copied " FMT_SIZE " bytes from stream, " FMT_SIZE
132              "left in stream, " FMT_SIZE " left to copy", len, stream->avail_in,
133              job->copy_len);
134 }
135 
136 /** Catch up on an outstanding copy command.
137  *
138  * Takes data from the scoop, and the input (in that order), and writes as much
139  * as will fit to the output, up to the limit of the outstanding copy. */
rs_tube_catchup_copy(rs_job_t * job)140 static void rs_tube_catchup_copy(rs_job_t *job)
141 {
142     assert(job->write_len == 0);
143     assert(job->copy_len > 0);
144 
145     /* If there's data in the scoop, send that first. */
146     if (job->scoop_avail && job->copy_len) {
147         rs_tube_copy_from_scoop(job);
148     }
149     /* If there's more to copy and we emptied the scoop, send input. */
150     if (job->copy_len && !job->scoop_avail) {
151         rs_tube_copy_from_stream(job);
152     }
153 }
154 
155 /** Put whatever will fit from the tube into the output of the stream.
156  *
157  * \return RS_DONE if the tube is now empty and ready to accept another
158  * command, RS_BLOCKED if there is still stuff waiting to go out. */
rs_tube_catchup(rs_job_t * job)159 rs_result rs_tube_catchup(rs_job_t *job)
160 {
161     if (job->write_len) {
162         rs_tube_catchup_write(job);
163         if (job->write_len)
164             return RS_BLOCKED;
165     }
166 
167     if (job->copy_len) {
168         rs_tube_catchup_copy(job);
169         if (job->copy_len) {
170             if (job->stream->eof_in && !job->stream->avail_in
171                 && !job->scoop_avail) {
172                 rs_error("reached end of file while copying data");
173                 return RS_INPUT_ENDED;
174             }
175             return RS_BLOCKED;
176         }
177     }
178     return RS_DONE;
179 }
180 
181 /* Check whether there is data in the tube waiting to go out.
182 
183    \return true if the previous command has finished doing all its output. */
rs_tube_is_idle(rs_job_t const * job)184 int rs_tube_is_idle(rs_job_t const *job)
185 {
186     return job->write_len == 0 && job->copy_len == 0;
187 }
188 
189 /** Queue up a request to copy through \p len bytes from the input to the
190  * output of the stream.
191  *
192  * The data is copied from the scoop (if there is anything there) or from the
193  * input, on the next call to rs_tube_write().
194  *
195  * We can only accept this request if there is no copy command already pending.
196  *
197  * \todo Try to do the copy immediately, and return a result. Then, people can
198  * try to continue if possible. Is this really required? Callers can just go
199  * out and back in again after flushing the tube. */
rs_tube_copy(rs_job_t * job,size_t len)200 void rs_tube_copy(rs_job_t *job, size_t len)
201 {
202     assert(job->copy_len == 0);
203 
204     job->copy_len = len;
205 }
206 
207 /** Push some data into the tube for storage.
208  *
209  * The tube's never supposed to get very big, so this will just pop loudly if
210  * you do that.
211  *
212  * We can't accept write data if there's already a copy command in the tube,
213  * because the write data comes out first. */
rs_tube_write(rs_job_t * job,const void * buf,size_t len)214 void rs_tube_write(rs_job_t *job, const void *buf, size_t len)
215 {
216     assert(job->copy_len == 0);
217     assert(len <= sizeof(job->write_buf) - job->write_len);
218 
219     memcpy(job->write_buf + job->write_len, buf, len);
220     job->write_len += len;
221 }
222