1 /*
2  * Copyright (C) 2012-2020 all contributors <cmogstored-public@yhbt.net>
3  * License: GPL-3.0+ <https://www.gnu.org/licenses/gpl-3.0.txt>
4  */
5 #include "cmogstored.h"
6 #include "http.h"
7 #include "digest.h"
8 
9 static __thread struct {
10 	bool ready;
11 	struct random_data data;
12 	char state[128];
13 } rnd;
14 
file_close_null(struct mog_fd * mfd)15 static void file_close_null(struct mog_fd *mfd)
16 {
17 	struct mog_http *http = &mfd->as.http;
18 
19 	if (http->forward == NULL)
20 		return;
21 	mog_http_unlink_ftmp(http);
22 	TRACE(CMOGSTORED_HTTP_BYTES_XFER(mfd->fd, http->forward->as.file.foff));
23 	mog_file_close(http->forward);
24 	http->forward = NULL;
25 }
26 
mog_http_write_full(struct mog_fd * file_mfd,char * buf,size_t buf_len)27 bool mog_http_write_full(struct mog_fd *file_mfd, char *buf, size_t buf_len)
28 {
29 	struct mog_file *file = &file_mfd->as.file;
30 	ssize_t w;
31 	const char *errpath;
32 
33 	if (file->digest.ctx)
34 		gc_hash_write(file->digest.ctx, buf_len, buf);
35 	if (buf_len == 0)
36 		return true;
37 
38 	errno = 0;
39 	for (;;) {
40 		w = pwrite(file_mfd->fd, buf, buf_len, file->foff);
41 
42 		if (w > 0) {
43 			file->foff += w;
44 			if (w == buf_len)
45 				return true;
46 
47 			buf_len -= w;
48 			buf += w;
49 			continue;
50 		}
51 		if (w < 0 && errno == EINTR)
52 			continue;
53 		if (w == 0 && errno == 0)
54 			errno = ENOSPC;
55 
56 		break;
57 	}
58 
59 	errpath = file->tmppath ? file->tmppath : file->path;
60 
61 	PRESERVE_ERRNO(do {
62 		if (w == 0)
63 			syslog(LOG_ERR,
64 			       "pwrite() to %s wrote zero bytes of "
65 			       "%llu at offset: %lld: assuming %m",
66 			       errpath, (unsigned long long)buf_len,
67 			       (long long)file->foff);
68 		else
69 			syslog(LOG_ERR,
70 			       "pwrite() to %s failed at offset: %lld: %m",
71 			       errpath, (long long)file->foff);
72 	} while (0));
73 
74 	return false;
75 }
76 
77 #define stop(mfd,status) stop0((mfd),(status),sizeof(status)-1);
78 
79 MOG_NOINLINE static enum mog_next
stop0(struct mog_fd * mfd,const char * status,size_t status_len)80 stop0(struct mog_fd *mfd, const char *status, size_t status_len)
81 {
82 	if (status) {
83 		struct iovec iov;
84 		union { const char *in; char *out; } deconst;
85 
86 		deconst.in = status;
87 		iov.iov_base = deconst.out;
88 		iov.iov_len = status_len;
89 
90 		mog_http_resp0(mfd, &iov, false);
91 	}
92 	file_close_null(mfd);
93 	return MOG_NEXT_CLOSE;
94 }
95 
96 MOG_NOINLINE static enum mog_next
write_err(struct mog_fd * mfd,const char * default_msg)97 write_err(struct mog_fd *mfd, const char *default_msg)
98 {
99 	switch (errno) {
100 	case ERANGE:
101 	case ENOSPC:
102 	case EFBIG:
103 		return stop(mfd, "507 Insufficient Storage");
104 	}
105 
106 	if (default_msg == NULL)
107 		default_msg = "500 Internal Server Error";
108 
109 	return stop0(mfd, default_msg, strlen(default_msg));
110 }
111 
md5_ok(struct mog_http * http)112 static bool md5_ok(struct mog_http *http)
113 {
114 	gc_hash_handle ctx = http->forward->as.file.digest.ctx;
115 	const char *result;
116 
117 	/* PUT requests don't _require_ Content-MD5 header/trailer */
118 	if (ctx == NULL)
119 		return true;
120 
121 	result = gc_hash_read(ctx);
122 
123 	return (memcmp(http->expect_md5, result, 16) == 0);
124 }
125 
set_perms_commit(struct mog_http * http)126 static bool set_perms_commit(struct mog_http *http)
127 {
128 	struct mog_file *file = &http->forward->as.file;
129 
130 	if (fchmod(http->forward->fd, http->svc->put_perms) != 0) {
131 		syslog(LOG_ERR, "fchmod() failed: %m");
132 		return false;
133 	}
134 
135 	if (file->tmppath == NULL)
136 		return true;
137 	assert(file->path && "file->path NULL when file->tmppath set");
138 	if (mog_rename(http->svc, file->tmppath, file->path) == 0) {
139 		mog_free_and_null(&file->tmppath);
140 		return true;
141 	}
142 
143 	syslog(LOG_ERR, "renameat(%s => %s) failed: %m",
144 	       file->tmppath, file->path);
145 	return false;
146 }
147 
put_commit_resp(struct mog_fd * mfd)148 static void put_commit_resp(struct mog_fd *mfd)
149 {
150 	struct mog_http *http = &mfd->as.http;
151 
152 	if (md5_ok(http)) { /* true if there's no MD5, too */
153 		if (set_perms_commit(http)) {
154 			file_close_null(mfd);
155 			mog_http_resp(mfd, "201 Created", true);
156 			mog_notify(MOG_NOTIFY_DEVICE_REFRESH);
157 		} else {
158 			file_close_null(mfd);
159 			mog_http_resp(mfd, "500 Internal Server Error", false);
160 		}
161 	} else {
162 		file_close_null(mfd);
163 		mog_http_resp(mfd, "400 Bad Request", true);
164 	}
165 }
166 
http_put_commit(struct mog_fd * mfd)167 static enum mog_next http_put_commit(struct mog_fd *mfd)
168 {
169 	struct mog_http *http = &mfd->as.http;
170 
171 	put_commit_resp(mfd);
172 
173 	if (http->wbuf && http->wbuf != MOG_WR_ERROR)
174 		return MOG_NEXT_WAIT_WR;
175 	if (!http->_p.persistent || http->wbuf == MOG_WR_ERROR)
176 		return MOG_NEXT_CLOSE;
177 	mog_http_reset(mfd);
178 	return MOG_NEXT_ACTIVE;
179 }
180 
stash_advance_rbuf(struct mog_http * http,char * buf,size_t buf_len)181 static void stash_advance_rbuf(struct mog_http *http, char *buf, size_t buf_len)
182 {
183 	struct mog_rbuf *rbuf = http->rbuf;
184 	size_t end = http->_p.line_end + 1;
185 
186 	if (http->_p.line_end == 0 || buf_len <= end) {
187 		http->_p.buf_off = 0;
188 		mog_rbuf_reattach_and_null(&http->rbuf);
189 		return;
190 	}
191 
192 	assert(buf[http->_p.line_end] == '\n' && "line_end is not LF");
193 	assert(buf_len <= MOG_RBUF_MAX_SIZE && "bad rbuf size");
194 	assert(end <= http->_p.buf_off && "invalid line end");
195 	if (rbuf == NULL)
196 		http->rbuf = rbuf = mog_rbuf_new(MOG_RBUF_BASE_SIZE);
197 
198 	memmove(rbuf->rptr, buf + end, buf_len - end);
199 	rbuf->rsize = buf_len - end;
200 	http->_p.buf_off -= end;
201 	if (http->_p.tmp_tip >= end)
202 		http->_p.tmp_tip -= end;
203 	http->_p.line_end = 0;
204 }
205 
206 static void
chunked_body_after_header(struct mog_fd * mfd,char * buf,size_t buf_len)207 chunked_body_after_header(struct mog_fd *mfd, char *buf, size_t buf_len)
208 {
209 	struct mog_http *http = &mfd->as.http;
210 	size_t tmpoff = http->_p.buf_off;
211 
212 	mog_chunk_init(http);
213 	http->_p.buf_off = tmpoff;
214 
215 	switch (mog_chunk_parse(http, buf, buf_len)) {
216 	case MOG_PARSER_ERROR:
217 		(void)write_err(mfd, "400 Bad Request");
218 		return;
219 	case MOG_PARSER_CONTINUE:
220 		assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE);
221 		/* fall through */
222 	case MOG_PARSER_DONE:
223 		switch (http->_p.chunk_state) {
224 		case MOG_CHUNK_STATE_SIZE:
225 			assert(http->_p.buf_off == buf_len
226 			       && "HTTP chunk parser didn't finish size");
227 			return;
228 		case MOG_CHUNK_STATE_DATA:
229 			assert(http->_p.buf_off == buf_len
230 			       && "HTTP chunk parser didn't finish data");
231 			return;
232 		case MOG_CHUNK_STATE_TRAILER:
233 			assert(http->_p.buf_off > 0 &&
234 			       "http->_p.buf_off unset while in trailer");
235 			stash_advance_rbuf(http, buf, buf_len);
236 			http->_p.skip_rbuf_defer = 1;
237 			return;
238 		case MOG_CHUNK_STATE_DONE:
239 			put_commit_resp(mfd);
240 			assert(http->_p.buf_off > 0 &&
241 			       "http->_p.buf_off unset after chunk body done");
242 			stash_advance_rbuf(http, buf, buf_len);
243 			http->_p.skip_rbuf_defer = 1;
244 		}
245 	}
246 }
247 
248 static void
identity_body_after_header(struct mog_fd * mfd,char * buf,size_t buf_len)249 identity_body_after_header(struct mog_fd *mfd, char *buf, size_t buf_len)
250 {
251 	struct mog_http *http = &mfd->as.http;
252 	size_t body_len = buf_len - http->_p.buf_off;
253 	char *body_ptr = buf + http->_p.buf_off;
254 
255 	if (http->_p.content_len < body_len)
256 		body_len = http->_p.content_len;
257 	if (body_len == 0)
258 		return;
259 	http->_p.buf_off += body_len;
260 	if (!mog_http_write_full(http->forward, body_ptr, body_len))
261 		(void)write_err(mfd, NULL);
262 }
263 
lengths_ok(struct mog_http * http)264 static bool lengths_ok(struct mog_http *http)
265 {
266 	if (http->_p.content_len < 0)
267 		return false;	/* ERANGE */
268 
269 	if (http->_p.has_content_range) {
270 		if (http->_p.chunked)
271 			return false;
272 
273 		if (http->_p.range_end < 0 || http->_p.range_beg < 0)
274 			return false;	/* ERANGE */
275 
276 		assert(http->_p.range_end >= 0 && http->_p.range_beg >= 0 &&
277 		       "bad range, http_parser.rl broken");
278 
279 		/* can't end after we start */
280 		if (http->_p.range_end < http->_p.range_beg)
281 			return false;
282 
283 		/*
284 		 * Content-Length should match Content-Range boundaries
285 		 * WARNING: Eric Wong sucks at arithmetic, check this:
286 		 */
287 		if (http->_p.content_len >= 0) {
288 			off_t expect = http->_p.range_end -
289 					http->_p.range_beg + 1;
290 
291 			if (http->_p.content_len != expect)
292 				return false;
293 		}
294 	}
295 	return true;
296 }
297 
rnd_init_per_thread(void)298 MOG_NOINLINE static void rnd_init_per_thread(void)
299 {
300 	unsigned seed = (unsigned)((size_t)&rnd >> 1);
301 
302 	CHECK(int, 0,
303 	      initstate_r(seed, rnd.state, sizeof(rnd.state), &rnd.data));
304 	rnd.ready = true;
305 }
306 
tmppath_for(struct mog_http * http,const char * path)307 static char *tmppath_for(struct mog_http *http, const char *path)
308 {
309 	int32_t result;
310 	int rc;
311 	char *s;
312 
313 	if (!rnd.ready)
314 		rnd_init_per_thread();
315 
316 	assert(http && "validation later"); /* TODO */
317 	CHECK(int, 0, random_r(&rnd.data, &result));
318 
319 	rc = asprintf(&s, "%s.%08x.%d.tmp",
320 			path, (unsigned)result, (int)getpid());
321 
322 	return rc >= 0 ? s : 0;
323 }
324 
open_put(struct mog_http * http,char * path)325 static struct mog_file * open_put(struct mog_http *http, char *path)
326 {
327 	struct mog_file *file;
328 
329 	/*
330 	 * we can't do an atomic rename(2) on successful PUT
331 	 * if we have a partial upload
332 	 */
333 	if (http->_p.has_content_range) {
334 		http->forward = mog_file_open_put(http->svc, path, O_CREAT);
335 		if (http->forward == NULL)
336 			return NULL;
337 
338 		file = &http->forward->as.file;
339 		assert(file->tmppath == NULL && file->path == NULL &&
340 		       "file->*path should both be NULL after open");
341 	} else {
342 		char *tmp = tmppath_for(http, path);
343 		int fl = O_EXCL | O_TRUNC | O_CREAT;
344 
345 		if (!tmp)
346 			return NULL;
347 
348 		http->forward = mog_file_open_put(http->svc, tmp, fl);
349 
350 		/* retry once on EEXIST, don't inf loop if RNG is broken */
351 		if (http->forward == NULL && errno == EEXIST) {
352 			free(tmp);
353 			tmp = tmppath_for(http, path);
354 			if (!tmp)
355 				return NULL;
356 			http->forward = mog_file_open_put(http->svc, tmp, fl);
357 		}
358 		if (http->forward == NULL) {
359 			PRESERVE_ERRNO( free(tmp) );
360 			return NULL;
361 		}
362 		file = &http->forward->as.file;
363 		file->tmppath = tmp;
364 	}
365 
366 	file->path = xstrdup(path);
367 	assert(file->foff == 0 && "file->foff should be zero");
368 	if (http->_p.has_content_range)
369 		file->foff = http->_p.range_beg;
370 	if (http->_p.has_md5)
371 		mog_digest_init(&file->digest, GC_MD5);
372 
373 	return file;
374 }
375 
mog_http_put(struct mog_fd * mfd,char * buf,size_t buf_len)376 void mog_http_put(struct mog_fd *mfd, char *buf, size_t buf_len)
377 {
378 	struct mog_http *http = &mfd->as.http;
379 	char *path;
380 	struct mog_file *file;
381 
382 	if (mfd->fd_type == MOG_FD_TYPE_HTTPGET) {
383 		mog_http_resp(mfd, "405 Method Not Allowed", false);
384 		return;
385 	}
386 
387 	path = mog_http_path(http, buf);
388 	if (path == NULL)
389 		goto err;	/* bad path */
390 	assert(http->forward == NULL && "already have http->forward");
391 	assert(path[0] == '/' && "bad path");
392 
393 	TRACE(CMOGSTORED_HTTP_REQ_START(mfd->fd, "PUT", path));
394 
395 	if (!lengths_ok(http)) {
396 		write_err(mfd, "400 Bad Request");
397 		return;
398 	}
399 
400 	file = open_put(http, path);
401 	if (file == NULL)
402 		goto err;
403 
404 	if (buf_len == http->_p.buf_off) {
405 		/* we got the HTTP header in one read() */
406 		if (http->_p.chunked) {
407 			mog_rbuf_reattach_and_null(&http->rbuf);
408 			mog_chunk_init(http);
409 			http->_p.buf_off = buf_len;
410 		}
411 		return;
412 	}
413 	/*
414 	 * otherwise we got part of the request body with the header,
415 	 * write partially read body
416 	 */
417 	assert(buf_len > http->_p.buf_off && http->_p.buf_off > 0
418 	       && "http->_p.buf_off is wrong");
419 
420 	if (http->_p.chunked)
421 		chunked_body_after_header(mfd, buf, buf_len);
422 	else
423 		identity_body_after_header(mfd, buf, buf_len);
424 
425 	return;
426 err:
427 	switch (errno) {
428 	case EINVAL:
429 		mog_http_resp(mfd, "400 Bad Request", false);
430 		return;
431 	case ENOENT:
432 		mog_http_resp(mfd, "404 Not Found", false);
433 		return;
434 	case EACCES:
435 		mog_http_resp(mfd, "403 Forbidden", false);
436 		return;
437 	}
438 	syslog(LOG_ERR, "problem starting PUT for path=%s (%m)", path);
439 	(void)write_err(mfd, NULL);
440 }
441 
last_data_recv(int fd)442 static unsigned last_data_recv(int fd)
443 {
444 #ifdef TCP_INFO
445 	struct tcp_info info;
446 	socklen_t len = (socklen_t)sizeof(struct tcp_info);
447 	int rc = getsockopt(fd, IPPROTO_TCP, TCP_INFO, &info, &len);
448 
449 	if (rc == 0)
450 		return (unsigned)info.tcpi_last_data_recv;
451 #endif /* TCP_INFO */
452 	return (unsigned)-1;
453 }
454 
read_err_dbg(struct mog_fd * mfd,ssize_t r)455 MOG_NOINLINE static void read_err_dbg(struct mog_fd *mfd, ssize_t r)
456 {
457 	int save_errno = errno;
458 	struct mog_ni ni;
459 	const char *path = "(unknown)";
460 	long long bytes = -1;
461 	const char *errfmt;
462 	unsigned last = last_data_recv(mfd->fd);
463 
464 	mog_nameinfo(&mfd->as.http.mpa, &ni);
465 
466 	if (mfd->as.http.forward) {
467 		path = mfd->as.http.forward->as.file.path;
468 		bytes = (long long)mfd->as.http.forward->as.file.foff;
469 	}
470 
471 #define PFX "PUT %s failed from %s%s after %lld bytes: "
472 	errfmt = (r == 0) ? PFX"premature EOF" : PFX"%m";
473 #undef PFX
474 	errno = save_errno;
475 	syslog(LOG_ERR, errfmt, path, ni.ni_host, ni.ni_serv, bytes);
476 
477 	if (last != (unsigned)-1)
478 		syslog(LOG_ERR, "last_data_recv=%ums from %s%s for PUT %s",
479 		       last, ni.ni_host, ni.ni_serv, path);
480 }
481 
identity_put_in_progress(struct mog_fd * mfd)482 static enum mog_next identity_put_in_progress(struct mog_fd *mfd)
483 {
484 	struct mog_http *http = &mfd->as.http;
485 	ssize_t r;
486 	size_t buf_len;
487 	char *buf;
488 	off_t need;
489 
490 	assert(http->wbuf == NULL && "can't receive file with http->wbuf");
491 	assert(http->forward && http->forward != MOG_IOSTAT && "bad forward");
492 
493 	need = http->_p.content_len - http->forward->as.file.foff;
494 	if (http->_p.has_content_range)
495 		need += http->_p.range_beg;
496 	if (need == 0)
497 		return http_put_commit(mfd);
498 
499 	buf = mog_fsbuf_get(&buf_len);
500 again:
501 	assert(need > 0 && "over-wrote on PUT request");
502 	if (need < buf_len)
503 		buf_len = need;
504 retry:
505 	r = read(mfd->fd, buf, buf_len);
506 	if (r > 0) {
507 		if (!mog_http_write_full(http->forward, buf, r))
508 			return write_err(mfd, NULL);
509 		need -= r;
510 		if (need == 0)
511 			return http_put_commit(mfd);
512 
513 		if (mog_ioq_contended())
514 			return MOG_NEXT_WAIT_RD;
515 		goto again;
516 	}
517 	if (r != 0) {
518 		switch (errno) {
519 		case_EAGAIN: return MOG_NEXT_WAIT_RD;
520 		case EINTR: goto retry;
521 		}
522 	}
523 
524 	/* assume all read() errors mean socket is unwritable, too */
525 	read_err_dbg(mfd, r);
526 	return stop(mfd, NULL);
527 }
528 
chunked_put_in_progress(struct mog_fd * mfd)529 static enum mog_next chunked_put_in_progress(struct mog_fd *mfd)
530 {
531 	struct mog_rbuf *rbuf;
532 	struct mog_http *http = &mfd->as.http;
533 	ssize_t r;
534 	size_t buf_len;
535 	size_t prev_len;
536 	char *buf;
537 	bool in_trailer = false;
538 
539 again:
540 	assert(http->wbuf == NULL && "can't receive file with http->wbuf");
541 	assert(http->forward && http->forward != MOG_IOSTAT && "bad forward");
542 
543 	switch (http->_p.chunk_state) {
544 	case MOG_CHUNK_STATE_DATA:
545 		assert(http->rbuf == NULL && "unexpected http->rbuf");
546 		if (http->_p.content_len == 0) { /* final chunk */
547 			http->_p.chunk_state = MOG_CHUNK_STATE_TRAILER;
548 			http->_p.buf_off = 0;
549 			goto chunk_state_trailer;
550 		}
551 		assert(http->_p.content_len > 0 && "bad chunk length");
552 		/* read the chunk into memory */
553 		buf = mog_fsbuf_get(&buf_len);
554 		if (buf_len > http->_p.content_len)
555 			buf_len = http->_p.content_len;
556 		do {
557 			r = read(mfd->fd, buf, buf_len);
558 		} while (r < 0 && errno == EINTR);
559 
560 		if (r <= 0)
561 			goto read_err;
562 		if (!mog_http_write_full(http->forward, buf, r))
563 			return write_err(mfd, NULL);
564 
565 		http->_p.content_len -= r;
566 
567 		/* chunk is complete */
568 		if (http->_p.content_len == 0)
569 			mog_chunk_init(http);
570 
571 		if (mog_ioq_contended())
572 			return MOG_NEXT_WAIT_RD;
573 		goto again;
574 	case MOG_CHUNK_STATE_TRAILER:
575 chunk_state_trailer:
576 		in_trailer = true;
577 		/* fall-through */
578 	case MOG_CHUNK_STATE_SIZE:
579 		rbuf = http->rbuf;
580 		if (rbuf) {
581 			prev_len = rbuf->rsize;
582 			buf_len = rbuf->rcapa - prev_len;
583 			buf = rbuf->rptr + prev_len;
584 			/*
585 			 * buf_len == 0 may happen here if client sends
586 			 * us very bogus data... just 400 it below
587 			 */
588 		} else {
589 			prev_len = 0;
590 			rbuf = mog_rbuf_get(MOG_RBUF_BASE_SIZE);
591 			buf_len = rbuf->rcapa;
592 			buf = rbuf->rptr;
593 		}
594 		do {
595 			r = read(mfd->fd, buf, buf_len);
596 		} while (r < 0 && errno == EINTR);
597 		if (r <= 0)
598 			goto read_err;
599 
600 		buf = rbuf->rptr;
601 		buf_len = r + prev_len;
602 
603 		switch (mog_chunk_parse(http, buf, buf_len)) {
604 		case MOG_PARSER_ERROR:
605 			return write_err(mfd, "400 Bad Request");
606 		case MOG_PARSER_CONTINUE:
607 			assert(http->_p.chunk_state != MOG_CHUNK_STATE_DONE);
608 		case MOG_PARSER_DONE:
609 			switch (http->_p.chunk_state) {
610 			case MOG_CHUNK_STATE_SIZE:
611 				if (in_trailer)
612 					assert(0 && "bad chunk state: size");
613 				/* client is trickling chunk size :< */
614 				mog_rbuf_reattach_and_null(&http->rbuf);
615 				http->_p.buf_off = 0;
616 				goto again;
617 			case MOG_CHUNK_STATE_DATA:
618 				if (in_trailer)
619 					assert(0 && "bad chunk state: data");
620 				/* client is trickling final chunk/trailer */
621 				mog_rbuf_reattach_and_null(&http->rbuf);
622 				goto again;
623 			case MOG_CHUNK_STATE_TRAILER:
624 				stash_advance_rbuf(http, buf, buf_len);
625 				goto again;
626 			case MOG_CHUNK_STATE_DONE:
627 				stash_advance_rbuf(http, buf, buf_len);
628 
629 				/* pipelined HTTP request after trailers! */
630 				if (http->rbuf)
631 					assert(http->rbuf->rsize > 0
632 					       && http->_p.buf_off == 0
633 					       && "bad rbuf");
634 				return http_put_commit(mfd);
635 			}
636 		}
637 		assert(0 && "compiler bug?");
638 	case MOG_CHUNK_STATE_DONE:
639 		assert(0 && "invalid state");
640 	}
641 
642 read_err:
643 	if (r < 0) {
644 		switch (errno) {
645 		case_EAGAIN: return MOG_NEXT_WAIT_RD;
646 		}
647 	}
648 	read_err_dbg(mfd, r);
649 	return stop(mfd, NULL);
650 }
651 
mog_http_put_in_progress(struct mog_fd * mfd)652 enum mog_next mog_http_put_in_progress(struct mog_fd *mfd)
653 {
654 	if (mfd->as.http._p.chunked)
655 		return chunked_put_in_progress(mfd);
656 
657 	return identity_put_in_progress(mfd);
658 }
659