1 /*
2  * Copyright (c) 2009, 2011, 2012 by Farsight Security, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *    http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 /* Import. */
18 
19 #include "private.h"
20 #include "libmy/tree.h"
21 
22 /* Forward. */
23 
24 static nmsg_res reassemble_frags(nmsg_input_t, Nmsg__Nmsg **, struct nmsg_frag *);
25 
26 /* Red-black nmsg_frag glue. */
27 
28 static int
frag_cmp(struct nmsg_frag * e1,struct nmsg_frag * e2)29 frag_cmp(struct nmsg_frag *e1, struct nmsg_frag *e2) {
30 	return (memcmp(&e1->key, &e2->key, sizeof(struct nmsg_frag_key)));
31 }
32 
RB_PROTOTYPE(frag_ent,nmsg_frag,link,frag_cmp)33 RB_PROTOTYPE(frag_ent, nmsg_frag, link, frag_cmp)
34 RB_GENERATE(frag_ent, nmsg_frag, link, frag_cmp)
35 
36 /* Convenience macros. */
37 
38 #define FRAG_INSERT(stream, fent) do { \
39 	RB_INSERT(frag_ent, &((stream)->nft.head), fent); \
40 } while(0)
41 
42 #define FRAG_FIND(stream, fent, find) do { \
43 	fent = RB_FIND(frag_ent, &((stream)->nft.head), find); \
44 } while(0)
45 
46 #define FRAG_REMOVE(stream, fent) do { \
47 	RB_REMOVE(frag_ent, &((stream)->nft.head), fent); \
48 } while(0)
49 
50 #define FRAG_NEXT(stream, fent, fent_next) do { \
51 	fent_next = RB_NEXT(frag_ent, &((stream)->nft.head), fent); \
52 } while(0)
53 
54 /* Internal functions. */
55 
56 nmsg_res
57 _input_frag_read(nmsg_input_t input, Nmsg__Nmsg **nmsg, uint8_t *buf, size_t buf_len) {
58 	Nmsg__NmsgFragment *nfrag;
59 	nmsg_res res;
60 	struct nmsg_frag *fent, find;
61 
62 	res = nmsg_res_again;
63 
64 	nfrag = nmsg__nmsg_fragment__unpack(NULL, buf_len, buf);
65 	if (nfrag == NULL)
66 		return (nmsg_res_parse_error);
67 
68 	/* find the fragment, else allocate a node and insert into the tree */
69 	memset(&find, 0, sizeof(find));
70 	find.key.id = nfrag->id;
71 	find.key.crc = nfrag->crc;
72 	memcpy(&find.key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
73 
74 	FRAG_FIND(input->stream, fent, &find);
75 	if (fent == NULL) {
76 		fent = calloc(1, sizeof(*fent));
77 		if (fent == NULL) {
78 			res = nmsg_res_memfail;
79 			goto read_input_frag_out;
80 		}
81 		fent->key.id = nfrag->id;
82 		fent->key.crc = nfrag->crc;
83 		memcpy(&fent->key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
84 		fent->last = nfrag->last;
85 		fent->rem = nfrag->last + 1;
86 		fent->ts = input->stream->now;
87 		fent->frags = calloc(1, sizeof(ProtobufCBinaryData) *
88 				     (fent->last + 1));
89 		if (fent->frags == NULL) {
90 			free(fent);
91 			res = nmsg_res_memfail;
92 			goto read_input_frag_out;
93 		}
94 		FRAG_INSERT(input->stream, fent);
95 		input->stream->nfrags += 1;
96 	} else {
97 		assert(fent->last == nfrag->last);
98 	}
99 
100 	if (fent->frags[nfrag->current].data != NULL) {
101 		/* fragment has already been received, network problem? */
102 		goto read_input_frag_out;
103 	}
104 
105 	/* attach the fragment payload to the tree node */
106 	fent->frags[nfrag->current] = nfrag->fragment;
107 
108 	/* decrement number of remaining fragments */
109 	fent->rem -= 1;
110 
111 	/* detach the fragment payload from the NmsgFragment */
112 	nfrag->fragment.len = 0;
113 	nfrag->fragment.data = NULL;
114 
115 	/* reassemble if all the fragments have been gathered */
116 	if (fent->rem == 0)
117 		res = reassemble_frags(input, nmsg, fent);
118 
119 read_input_frag_out:
120 	nmsg__nmsg_fragment__free_unpacked(nfrag, NULL);
121 	return (res);
122 }
123 
124 void
_input_frag_destroy(struct nmsg_stream_input * stream)125 _input_frag_destroy(struct nmsg_stream_input *stream) {
126 	struct nmsg_frag *fent, *fent_next;
127 	unsigned i;
128 
129 	for (fent = RB_MIN(frag_ent, &(stream->nft.head));
130 	     fent != NULL;
131 	     fent = fent_next)
132 	{
133 		FRAG_NEXT(stream, fent, fent_next);
134 		for (i = 0; i <= fent->last; i++)
135 			free(fent->frags[i].data);
136 		free(fent->frags);
137 		FRAG_REMOVE(stream, fent);
138 		free(fent);
139 	}
140 }
141 
142 void
_input_frag_gc(struct nmsg_stream_input * stream)143 _input_frag_gc(struct nmsg_stream_input *stream) {
144 	struct nmsg_frag *fent, *fent_next;
145 	unsigned i;
146 
147 	if (!(stream->nfrags > 0 &&
148 	      stream->now.tv_sec - stream->lastgc.tv_sec >= NMSG_FRAG_GC_INTERVAL))
149 	{
150 		return;
151 	}
152 
153 	for (fent = RB_MIN(frag_ent, &(stream->nft.head));
154 	     fent != NULL;
155 	     fent = fent_next)
156 	{
157 		FRAG_NEXT(stream, fent, fent_next);
158 		if (stream->now.tv_sec - fent->ts.tv_sec >=
159 		    NMSG_FRAG_GC_INTERVAL)
160 		{
161 			FRAG_NEXT(stream, fent, fent_next);
162 			for (i = 0; i <= fent->last; i++)
163 				free(fent->frags[i].data);
164 			free(fent->frags);
165 			FRAG_REMOVE(stream, fent);
166 			free(fent);
167 			stream->nfrags -= 1;
168 		}
169 	}
170 
171 	stream->lastgc = stream->now;
172 }
173 
174 /* Private functions. */
175 
176 static nmsg_res
reassemble_frags(nmsg_input_t input,Nmsg__Nmsg ** nmsg,struct nmsg_frag * fent)177 reassemble_frags(nmsg_input_t input, Nmsg__Nmsg **nmsg, struct nmsg_frag *fent) {
178 	nmsg_res res;
179 	size_t len, padded_len;
180 	uint8_t *payload, *ptr;
181 	unsigned i;
182 
183 	/* obtain total length of reassembled payload */
184 	len = 0;
185 	for (i = 0; i <= fent->last; i++) {
186 		assert(fent->frags[i].data != NULL);
187 		len += fent->frags[i].len;
188 	}
189 
190 	/* round total length up to nearest kilobyte */
191 	padded_len = len;
192 	if (len % 1024 != 0)
193 		padded_len += 1024 - (len % 1024);
194 
195 	ptr = payload = malloc(padded_len);
196 	if (payload == NULL) {
197 		return (nmsg_res_memfail);
198 	}
199 
200 	/* copy into the payload buffer and deallocate frags */
201 	for (i = 0; i <= fent->last; i++) {
202 		memcpy(ptr, fent->frags[i].data, fent->frags[i].len);
203 		free(fent->frags[i].data);
204 		ptr += fent->frags[i].len;
205 	}
206 	free(fent->frags);
207 
208 	/* decompress */
209 	if (input->stream->flags & NMSG_FLAG_ZLIB) {
210 		size_t u_len;
211 		u_char *u_buf, *z_buf;
212 
213 		z_buf = (u_char *) payload;
214 		res = nmsg_zbuf_inflate(input->stream->zb, len, z_buf,
215 					&u_len, &u_buf);
216 		if (res != nmsg_res_success) {
217 			free(payload);
218 			goto reassemble_frags_out;
219 		}
220 		payload = u_buf;
221 		len = u_len;
222 		free(z_buf);
223 	}
224 
225 	/* unpack the defragmented payload */
226 	*nmsg = nmsg__nmsg__unpack(NULL, len, payload);
227 	if (*nmsg != NULL)
228 		res = nmsg_res_success;
229 	else
230 		res = nmsg_res_parse_error;
231 	free(payload);
232 
233 reassemble_frags_out:
234 	/* deallocate from tree */
235 	input->stream->nfrags -= 1;
236 	FRAG_REMOVE(input->stream, fent);
237 	free(fent);
238 
239 	return (res);
240 }
241 
242