1 /*
2  * Copyright (c) 2011, Google Inc.
3  */
4 #include "cache.h"
5 #include "streaming.h"
6 #include "repository.h"
7 #include "object-store.h"
8 #include "replace-object.h"
9 #include "packfile.h"
10 
11 typedef int (*open_istream_fn)(struct git_istream *,
12 			       struct repository *,
13 			       const struct object_id *,
14 			       enum object_type *);
15 typedef int (*close_istream_fn)(struct git_istream *);
16 typedef ssize_t (*read_istream_fn)(struct git_istream *, char *, size_t);
17 
18 #define FILTER_BUFFER (1024*16)
19 
20 struct filtered_istream {
21 	struct git_istream *upstream;
22 	struct stream_filter *filter;
23 	char ibuf[FILTER_BUFFER];
24 	char obuf[FILTER_BUFFER];
25 	int i_end, i_ptr;
26 	int o_end, o_ptr;
27 	int input_finished;
28 };
29 
30 struct git_istream {
31 	open_istream_fn open;
32 	close_istream_fn close;
33 	read_istream_fn read;
34 
35 	unsigned long size; /* inflated size of full object */
36 	git_zstream z;
37 	enum { z_unused, z_used, z_done, z_error } z_state;
38 
39 	union {
40 		struct {
41 			char *buf; /* from read_object() */
42 			unsigned long read_ptr;
43 		} incore;
44 
45 		struct {
46 			void *mapped;
47 			unsigned long mapsize;
48 			char hdr[32];
49 			int hdr_avail;
50 			int hdr_used;
51 		} loose;
52 
53 		struct {
54 			struct packed_git *pack;
55 			off_t pos;
56 		} in_pack;
57 
58 		struct filtered_istream filtered;
59 	} u;
60 };
61 
62 /*****************************************************************
63  *
64  * Common helpers
65  *
66  *****************************************************************/
67 
close_deflated_stream(struct git_istream * st)68 static void close_deflated_stream(struct git_istream *st)
69 {
70 	if (st->z_state == z_used)
71 		git_inflate_end(&st->z);
72 }
73 
74 
75 /*****************************************************************
76  *
77  * Filtered stream
78  *
79  *****************************************************************/
80 
close_istream_filtered(struct git_istream * st)81 static int close_istream_filtered(struct git_istream *st)
82 {
83 	free_stream_filter(st->u.filtered.filter);
84 	return close_istream(st->u.filtered.upstream);
85 }
86 
read_istream_filtered(struct git_istream * st,char * buf,size_t sz)87 static ssize_t read_istream_filtered(struct git_istream *st, char *buf,
88 				     size_t sz)
89 {
90 	struct filtered_istream *fs = &(st->u.filtered);
91 	size_t filled = 0;
92 
93 	while (sz) {
94 		/* do we already have filtered output? */
95 		if (fs->o_ptr < fs->o_end) {
96 			size_t to_move = fs->o_end - fs->o_ptr;
97 			if (sz < to_move)
98 				to_move = sz;
99 			memcpy(buf + filled, fs->obuf + fs->o_ptr, to_move);
100 			fs->o_ptr += to_move;
101 			sz -= to_move;
102 			filled += to_move;
103 			continue;
104 		}
105 		fs->o_end = fs->o_ptr = 0;
106 
107 		/* do we have anything to feed the filter with? */
108 		if (fs->i_ptr < fs->i_end) {
109 			size_t to_feed = fs->i_end - fs->i_ptr;
110 			size_t to_receive = FILTER_BUFFER;
111 			if (stream_filter(fs->filter,
112 					  fs->ibuf + fs->i_ptr, &to_feed,
113 					  fs->obuf, &to_receive))
114 				return -1;
115 			fs->i_ptr = fs->i_end - to_feed;
116 			fs->o_end = FILTER_BUFFER - to_receive;
117 			continue;
118 		}
119 
120 		/* tell the filter to drain upon no more input */
121 		if (fs->input_finished) {
122 			size_t to_receive = FILTER_BUFFER;
123 			if (stream_filter(fs->filter,
124 					  NULL, NULL,
125 					  fs->obuf, &to_receive))
126 				return -1;
127 			fs->o_end = FILTER_BUFFER - to_receive;
128 			if (!fs->o_end)
129 				break;
130 			continue;
131 		}
132 		fs->i_end = fs->i_ptr = 0;
133 
134 		/* refill the input from the upstream */
135 		if (!fs->input_finished) {
136 			fs->i_end = read_istream(fs->upstream, fs->ibuf, FILTER_BUFFER);
137 			if (fs->i_end < 0)
138 				return -1;
139 			if (fs->i_end)
140 				continue;
141 		}
142 		fs->input_finished = 1;
143 	}
144 	return filled;
145 }
146 
attach_stream_filter(struct git_istream * st,struct stream_filter * filter)147 static struct git_istream *attach_stream_filter(struct git_istream *st,
148 						struct stream_filter *filter)
149 {
150 	struct git_istream *ifs = xmalloc(sizeof(*ifs));
151 	struct filtered_istream *fs = &(ifs->u.filtered);
152 
153 	ifs->close = close_istream_filtered;
154 	ifs->read = read_istream_filtered;
155 	fs->upstream = st;
156 	fs->filter = filter;
157 	fs->i_end = fs->i_ptr = 0;
158 	fs->o_end = fs->o_ptr = 0;
159 	fs->input_finished = 0;
160 	ifs->size = -1; /* unknown */
161 	return ifs;
162 }
163 
164 /*****************************************************************
165  *
166  * Loose object stream
167  *
168  *****************************************************************/
169 
read_istream_loose(struct git_istream * st,char * buf,size_t sz)170 static ssize_t read_istream_loose(struct git_istream *st, char *buf, size_t sz)
171 {
172 	size_t total_read = 0;
173 
174 	switch (st->z_state) {
175 	case z_done:
176 		return 0;
177 	case z_error:
178 		return -1;
179 	default:
180 		break;
181 	}
182 
183 	if (st->u.loose.hdr_used < st->u.loose.hdr_avail) {
184 		size_t to_copy = st->u.loose.hdr_avail - st->u.loose.hdr_used;
185 		if (sz < to_copy)
186 			to_copy = sz;
187 		memcpy(buf, st->u.loose.hdr + st->u.loose.hdr_used, to_copy);
188 		st->u.loose.hdr_used += to_copy;
189 		total_read += to_copy;
190 	}
191 
192 	while (total_read < sz) {
193 		int status;
194 
195 		st->z.next_out = (unsigned char *)buf + total_read;
196 		st->z.avail_out = sz - total_read;
197 		status = git_inflate(&st->z, Z_FINISH);
198 
199 		total_read = st->z.next_out - (unsigned char *)buf;
200 
201 		if (status == Z_STREAM_END) {
202 			git_inflate_end(&st->z);
203 			st->z_state = z_done;
204 			break;
205 		}
206 		if (status != Z_OK && (status != Z_BUF_ERROR || total_read < sz)) {
207 			git_inflate_end(&st->z);
208 			st->z_state = z_error;
209 			return -1;
210 		}
211 	}
212 	return total_read;
213 }
214 
close_istream_loose(struct git_istream * st)215 static int close_istream_loose(struct git_istream *st)
216 {
217 	close_deflated_stream(st);
218 	munmap(st->u.loose.mapped, st->u.loose.mapsize);
219 	return 0;
220 }
221 
open_istream_loose(struct git_istream * st,struct repository * r,const struct object_id * oid,enum object_type * type)222 static int open_istream_loose(struct git_istream *st, struct repository *r,
223 			      const struct object_id *oid,
224 			      enum object_type *type)
225 {
226 	struct object_info oi = OBJECT_INFO_INIT;
227 	oi.sizep = &st->size;
228 	oi.typep = type;
229 
230 	st->u.loose.mapped = map_loose_object(r, oid, &st->u.loose.mapsize);
231 	if (!st->u.loose.mapped)
232 		return -1;
233 	switch (unpack_loose_header(&st->z, st->u.loose.mapped,
234 				    st->u.loose.mapsize, st->u.loose.hdr,
235 				    sizeof(st->u.loose.hdr), NULL)) {
236 	case ULHR_OK:
237 		break;
238 	case ULHR_BAD:
239 	case ULHR_TOO_LONG:
240 		goto error;
241 	}
242 	if (parse_loose_header(st->u.loose.hdr, &oi) < 0 || *type < 0)
243 		goto error;
244 
245 	st->u.loose.hdr_used = strlen(st->u.loose.hdr) + 1;
246 	st->u.loose.hdr_avail = st->z.total_out;
247 	st->z_state = z_used;
248 	st->close = close_istream_loose;
249 	st->read = read_istream_loose;
250 
251 	return 0;
252 error:
253 	git_inflate_end(&st->z);
254 	munmap(st->u.loose.mapped, st->u.loose.mapsize);
255 	return -1;
256 }
257 
258 
259 /*****************************************************************
260  *
261  * Non-delta packed object stream
262  *
263  *****************************************************************/
264 
read_istream_pack_non_delta(struct git_istream * st,char * buf,size_t sz)265 static ssize_t read_istream_pack_non_delta(struct git_istream *st, char *buf,
266 					   size_t sz)
267 {
268 	size_t total_read = 0;
269 
270 	switch (st->z_state) {
271 	case z_unused:
272 		memset(&st->z, 0, sizeof(st->z));
273 		git_inflate_init(&st->z);
274 		st->z_state = z_used;
275 		break;
276 	case z_done:
277 		return 0;
278 	case z_error:
279 		return -1;
280 	case z_used:
281 		break;
282 	}
283 
284 	while (total_read < sz) {
285 		int status;
286 		struct pack_window *window = NULL;
287 		unsigned char *mapped;
288 
289 		mapped = use_pack(st->u.in_pack.pack, &window,
290 				  st->u.in_pack.pos, &st->z.avail_in);
291 
292 		st->z.next_out = (unsigned char *)buf + total_read;
293 		st->z.avail_out = sz - total_read;
294 		st->z.next_in = mapped;
295 		status = git_inflate(&st->z, Z_FINISH);
296 
297 		st->u.in_pack.pos += st->z.next_in - mapped;
298 		total_read = st->z.next_out - (unsigned char *)buf;
299 		unuse_pack(&window);
300 
301 		if (status == Z_STREAM_END) {
302 			git_inflate_end(&st->z);
303 			st->z_state = z_done;
304 			break;
305 		}
306 
307 		/*
308 		 * Unlike the loose object case, we do not have to worry here
309 		 * about running out of input bytes and spinning infinitely. If
310 		 * we get Z_BUF_ERROR due to too few input bytes, then we'll
311 		 * replenish them in the next use_pack() call when we loop. If
312 		 * we truly hit the end of the pack (i.e., because it's corrupt
313 		 * or truncated), then use_pack() catches that and will die().
314 		 */
315 		if (status != Z_OK && status != Z_BUF_ERROR) {
316 			git_inflate_end(&st->z);
317 			st->z_state = z_error;
318 			return -1;
319 		}
320 	}
321 	return total_read;
322 }
323 
close_istream_pack_non_delta(struct git_istream * st)324 static int close_istream_pack_non_delta(struct git_istream *st)
325 {
326 	close_deflated_stream(st);
327 	return 0;
328 }
329 
open_istream_pack_non_delta(struct git_istream * st,struct repository * r,const struct object_id * oid,enum object_type * type)330 static int open_istream_pack_non_delta(struct git_istream *st,
331 				       struct repository *r,
332 				       const struct object_id *oid,
333 				       enum object_type *type)
334 {
335 	struct pack_window *window;
336 	enum object_type in_pack_type;
337 
338 	window = NULL;
339 
340 	in_pack_type = unpack_object_header(st->u.in_pack.pack,
341 					    &window,
342 					    &st->u.in_pack.pos,
343 					    &st->size);
344 	unuse_pack(&window);
345 	switch (in_pack_type) {
346 	default:
347 		return -1; /* we do not do deltas for now */
348 	case OBJ_COMMIT:
349 	case OBJ_TREE:
350 	case OBJ_BLOB:
351 	case OBJ_TAG:
352 		break;
353 	}
354 	st->z_state = z_unused;
355 	st->close = close_istream_pack_non_delta;
356 	st->read = read_istream_pack_non_delta;
357 
358 	return 0;
359 }
360 
361 
362 /*****************************************************************
363  *
364  * In-core stream
365  *
366  *****************************************************************/
367 
close_istream_incore(struct git_istream * st)368 static int close_istream_incore(struct git_istream *st)
369 {
370 	free(st->u.incore.buf);
371 	return 0;
372 }
373 
read_istream_incore(struct git_istream * st,char * buf,size_t sz)374 static ssize_t read_istream_incore(struct git_istream *st, char *buf, size_t sz)
375 {
376 	size_t read_size = sz;
377 	size_t remainder = st->size - st->u.incore.read_ptr;
378 
379 	if (remainder <= read_size)
380 		read_size = remainder;
381 	if (read_size) {
382 		memcpy(buf, st->u.incore.buf + st->u.incore.read_ptr, read_size);
383 		st->u.incore.read_ptr += read_size;
384 	}
385 	return read_size;
386 }
387 
open_istream_incore(struct git_istream * st,struct repository * r,const struct object_id * oid,enum object_type * type)388 static int open_istream_incore(struct git_istream *st, struct repository *r,
389 			       const struct object_id *oid, enum object_type *type)
390 {
391 	st->u.incore.buf = read_object_file_extended(r, oid, type, &st->size, 0);
392 	st->u.incore.read_ptr = 0;
393 	st->close = close_istream_incore;
394 	st->read = read_istream_incore;
395 
396 	return st->u.incore.buf ? 0 : -1;
397 }
398 
399 /*****************************************************************************
400  * static helpers variables and functions for users of streaming interface
401  *****************************************************************************/
402 
istream_source(struct git_istream * st,struct repository * r,const struct object_id * oid,enum object_type * type)403 static int istream_source(struct git_istream *st,
404 			  struct repository *r,
405 			  const struct object_id *oid,
406 			  enum object_type *type)
407 {
408 	unsigned long size;
409 	int status;
410 	struct object_info oi = OBJECT_INFO_INIT;
411 
412 	oi.typep = type;
413 	oi.sizep = &size;
414 	status = oid_object_info_extended(r, oid, &oi, 0);
415 	if (status < 0)
416 		return status;
417 
418 	switch (oi.whence) {
419 	case OI_LOOSE:
420 		st->open = open_istream_loose;
421 		return 0;
422 	case OI_PACKED:
423 		if (!oi.u.packed.is_delta && big_file_threshold < size) {
424 			st->u.in_pack.pack = oi.u.packed.pack;
425 			st->u.in_pack.pos = oi.u.packed.offset;
426 			st->open = open_istream_pack_non_delta;
427 			return 0;
428 		}
429 		/* fallthru */
430 	default:
431 		st->open = open_istream_incore;
432 		return 0;
433 	}
434 }
435 
436 /****************************************************************
437  * Users of streaming interface
438  ****************************************************************/
439 
close_istream(struct git_istream * st)440 int close_istream(struct git_istream *st)
441 {
442 	int r = st->close(st);
443 	free(st);
444 	return r;
445 }
446 
read_istream(struct git_istream * st,void * buf,size_t sz)447 ssize_t read_istream(struct git_istream *st, void *buf, size_t sz)
448 {
449 	return st->read(st, buf, sz);
450 }
451 
open_istream(struct repository * r,const struct object_id * oid,enum object_type * type,unsigned long * size,struct stream_filter * filter)452 struct git_istream *open_istream(struct repository *r,
453 				 const struct object_id *oid,
454 				 enum object_type *type,
455 				 unsigned long *size,
456 				 struct stream_filter *filter)
457 {
458 	struct git_istream *st = xmalloc(sizeof(*st));
459 	const struct object_id *real = lookup_replace_object(r, oid);
460 	int ret = istream_source(st, r, real, type);
461 
462 	if (ret) {
463 		free(st);
464 		return NULL;
465 	}
466 
467 	if (st->open(st, r, real, type)) {
468 		if (open_istream_incore(st, r, real, type)) {
469 			free(st);
470 			return NULL;
471 		}
472 	}
473 	if (filter) {
474 		/* Add "&& !is_null_stream_filter(filter)" for performance */
475 		struct git_istream *nst = attach_stream_filter(st, filter);
476 		if (!nst) {
477 			close_istream(st);
478 			return NULL;
479 		}
480 		st = nst;
481 	}
482 
483 	*size = st->size;
484 	return st;
485 }
486 
stream_blob_to_fd(int fd,const struct object_id * oid,struct stream_filter * filter,int can_seek)487 int stream_blob_to_fd(int fd, const struct object_id *oid, struct stream_filter *filter,
488 		      int can_seek)
489 {
490 	struct git_istream *st;
491 	enum object_type type;
492 	unsigned long sz;
493 	ssize_t kept = 0;
494 	int result = -1;
495 
496 	st = open_istream(the_repository, oid, &type, &sz, filter);
497 	if (!st) {
498 		if (filter)
499 			free_stream_filter(filter);
500 		return result;
501 	}
502 	if (type != OBJ_BLOB)
503 		goto close_and_exit;
504 	for (;;) {
505 		char buf[1024 * 16];
506 		ssize_t wrote, holeto;
507 		ssize_t readlen = read_istream(st, buf, sizeof(buf));
508 
509 		if (readlen < 0)
510 			goto close_and_exit;
511 		if (!readlen)
512 			break;
513 		if (can_seek && sizeof(buf) == readlen) {
514 			for (holeto = 0; holeto < readlen; holeto++)
515 				if (buf[holeto])
516 					break;
517 			if (readlen == holeto) {
518 				kept += holeto;
519 				continue;
520 			}
521 		}
522 
523 		if (kept && lseek(fd, kept, SEEK_CUR) == (off_t) -1)
524 			goto close_and_exit;
525 		else
526 			kept = 0;
527 		wrote = write_in_full(fd, buf, readlen);
528 
529 		if (wrote < 0)
530 			goto close_and_exit;
531 	}
532 	if (kept && (lseek(fd, kept - 1, SEEK_CUR) == (off_t) -1 ||
533 		     xwrite(fd, "", 1) != 1))
534 		goto close_and_exit;
535 	result = 0;
536 
537  close_and_exit:
538 	close_istream(st);
539 	return result;
540 }
541