1 /*
2 * Copyright (c) 2009, 2011, 2012 by Farsight Security, Inc.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 /* Import. */
18
19 #include "private.h"
20 #include "libmy/tree.h"
21
22 /* Forward. */
23
24 static nmsg_res reassemble_frags(nmsg_input_t, Nmsg__Nmsg **, struct nmsg_frag *);
25
26 /* Red-black nmsg_frag glue. */
27
28 static int
frag_cmp(struct nmsg_frag * e1,struct nmsg_frag * e2)29 frag_cmp(struct nmsg_frag *e1, struct nmsg_frag *e2) {
30 return (memcmp(&e1->key, &e2->key, sizeof(struct nmsg_frag_key)));
31 }
32
RB_PROTOTYPE(frag_ent,nmsg_frag,link,frag_cmp)33 RB_PROTOTYPE(frag_ent, nmsg_frag, link, frag_cmp)
34 RB_GENERATE(frag_ent, nmsg_frag, link, frag_cmp)
35
36 /* Convenience macros. */
37
38 #define FRAG_INSERT(stream, fent) do { \
39 RB_INSERT(frag_ent, &((stream)->nft.head), fent); \
40 } while(0)
41
42 #define FRAG_FIND(stream, fent, find) do { \
43 fent = RB_FIND(frag_ent, &((stream)->nft.head), find); \
44 } while(0)
45
46 #define FRAG_REMOVE(stream, fent) do { \
47 RB_REMOVE(frag_ent, &((stream)->nft.head), fent); \
48 } while(0)
49
50 #define FRAG_NEXT(stream, fent, fent_next) do { \
51 fent_next = RB_NEXT(frag_ent, &((stream)->nft.head), fent); \
52 } while(0)
53
54 /* Internal functions. */
55
56 nmsg_res
57 _input_frag_read(nmsg_input_t input, Nmsg__Nmsg **nmsg, uint8_t *buf, size_t buf_len) {
58 Nmsg__NmsgFragment *nfrag;
59 nmsg_res res;
60 struct nmsg_frag *fent, find;
61
62 res = nmsg_res_again;
63
64 nfrag = nmsg__nmsg_fragment__unpack(NULL, buf_len, buf);
65 if (nfrag == NULL)
66 return (nmsg_res_parse_error);
67
68 /* find the fragment, else allocate a node and insert into the tree */
69 memset(&find, 0, sizeof(find));
70 find.key.id = nfrag->id;
71 find.key.crc = nfrag->crc;
72 memcpy(&find.key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
73
74 FRAG_FIND(input->stream, fent, &find);
75 if (fent == NULL) {
76 fent = calloc(1, sizeof(*fent));
77 if (fent == NULL) {
78 res = nmsg_res_memfail;
79 goto read_input_frag_out;
80 }
81 fent->key.id = nfrag->id;
82 fent->key.crc = nfrag->crc;
83 memcpy(&fent->key.addr_ss, &input->stream->addr_ss, sizeof(input->stream->addr_ss));
84 fent->last = nfrag->last;
85 fent->rem = nfrag->last + 1;
86 fent->ts = input->stream->now;
87 fent->frags = calloc(1, sizeof(ProtobufCBinaryData) *
88 (fent->last + 1));
89 if (fent->frags == NULL) {
90 free(fent);
91 res = nmsg_res_memfail;
92 goto read_input_frag_out;
93 }
94 FRAG_INSERT(input->stream, fent);
95 input->stream->nfrags += 1;
96 } else {
97 assert(fent->last == nfrag->last);
98 }
99
100 if (fent->frags[nfrag->current].data != NULL) {
101 /* fragment has already been received, network problem? */
102 goto read_input_frag_out;
103 }
104
105 /* attach the fragment payload to the tree node */
106 fent->frags[nfrag->current] = nfrag->fragment;
107
108 /* decrement number of remaining fragments */
109 fent->rem -= 1;
110
111 /* detach the fragment payload from the NmsgFragment */
112 nfrag->fragment.len = 0;
113 nfrag->fragment.data = NULL;
114
115 /* reassemble if all the fragments have been gathered */
116 if (fent->rem == 0)
117 res = reassemble_frags(input, nmsg, fent);
118
119 read_input_frag_out:
120 nmsg__nmsg_fragment__free_unpacked(nfrag, NULL);
121 return (res);
122 }
123
124 void
_input_frag_destroy(struct nmsg_stream_input * stream)125 _input_frag_destroy(struct nmsg_stream_input *stream) {
126 struct nmsg_frag *fent, *fent_next;
127 unsigned i;
128
129 for (fent = RB_MIN(frag_ent, &(stream->nft.head));
130 fent != NULL;
131 fent = fent_next)
132 {
133 FRAG_NEXT(stream, fent, fent_next);
134 for (i = 0; i <= fent->last; i++)
135 free(fent->frags[i].data);
136 free(fent->frags);
137 FRAG_REMOVE(stream, fent);
138 free(fent);
139 }
140 }
141
142 void
_input_frag_gc(struct nmsg_stream_input * stream)143 _input_frag_gc(struct nmsg_stream_input *stream) {
144 struct nmsg_frag *fent, *fent_next;
145 unsigned i;
146
147 if (!(stream->nfrags > 0 &&
148 stream->now.tv_sec - stream->lastgc.tv_sec >= NMSG_FRAG_GC_INTERVAL))
149 {
150 return;
151 }
152
153 for (fent = RB_MIN(frag_ent, &(stream->nft.head));
154 fent != NULL;
155 fent = fent_next)
156 {
157 FRAG_NEXT(stream, fent, fent_next);
158 if (stream->now.tv_sec - fent->ts.tv_sec >=
159 NMSG_FRAG_GC_INTERVAL)
160 {
161 FRAG_NEXT(stream, fent, fent_next);
162 for (i = 0; i <= fent->last; i++)
163 free(fent->frags[i].data);
164 free(fent->frags);
165 FRAG_REMOVE(stream, fent);
166 free(fent);
167 stream->nfrags -= 1;
168 }
169 }
170
171 stream->lastgc = stream->now;
172 }
173
174 /* Private functions. */
175
176 static nmsg_res
reassemble_frags(nmsg_input_t input,Nmsg__Nmsg ** nmsg,struct nmsg_frag * fent)177 reassemble_frags(nmsg_input_t input, Nmsg__Nmsg **nmsg, struct nmsg_frag *fent) {
178 nmsg_res res;
179 size_t len, padded_len;
180 uint8_t *payload, *ptr;
181 unsigned i;
182
183 /* obtain total length of reassembled payload */
184 len = 0;
185 for (i = 0; i <= fent->last; i++) {
186 assert(fent->frags[i].data != NULL);
187 len += fent->frags[i].len;
188 }
189
190 /* round total length up to nearest kilobyte */
191 padded_len = len;
192 if (len % 1024 != 0)
193 padded_len += 1024 - (len % 1024);
194
195 ptr = payload = malloc(padded_len);
196 if (payload == NULL) {
197 return (nmsg_res_memfail);
198 }
199
200 /* copy into the payload buffer and deallocate frags */
201 for (i = 0; i <= fent->last; i++) {
202 memcpy(ptr, fent->frags[i].data, fent->frags[i].len);
203 free(fent->frags[i].data);
204 ptr += fent->frags[i].len;
205 }
206 free(fent->frags);
207
208 /* decompress */
209 if (input->stream->flags & NMSG_FLAG_ZLIB) {
210 size_t u_len;
211 u_char *u_buf, *z_buf;
212
213 z_buf = (u_char *) payload;
214 res = nmsg_zbuf_inflate(input->stream->zb, len, z_buf,
215 &u_len, &u_buf);
216 if (res != nmsg_res_success) {
217 free(payload);
218 goto reassemble_frags_out;
219 }
220 payload = u_buf;
221 len = u_len;
222 free(z_buf);
223 }
224
225 /* unpack the defragmented payload */
226 *nmsg = nmsg__nmsg__unpack(NULL, len, payload);
227 if (*nmsg != NULL)
228 res = nmsg_res_success;
229 else
230 res = nmsg_res_parse_error;
231 free(payload);
232
233 reassemble_frags_out:
234 /* deallocate from tree */
235 input->stream->nfrags -= 1;
236 FRAG_REMOVE(input->stream, fent);
237 free(fent);
238
239 return (res);
240 }
241
242