1 /* ====================================================================
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 * ====================================================================
19 */
20
21 #include "serf.h"
22 #include "serf_bucket_util.h"
23
24
25 /* Should be an APR_RING? */
26 typedef struct bucket_list {
27 serf_bucket_t *bucket;
28 struct bucket_list *next;
29 } bucket_list_t;
30
31 typedef struct {
32 bucket_list_t *list; /* active buckets */
33 bucket_list_t *last; /* last bucket of the list */
34 bucket_list_t *done; /* we finished reading this; now pending a destroy */
35
36 serf_bucket_aggregate_eof_t hold_open;
37 void *hold_open_baton;
38
39 /* Does this bucket own its children? !0 if yes, 0 if not. */
40 int bucket_owner;
41 } aggregate_context_t;
42
43
cleanup_aggregate(aggregate_context_t * ctx,serf_bucket_alloc_t * allocator)44 static void cleanup_aggregate(aggregate_context_t *ctx,
45 serf_bucket_alloc_t *allocator)
46 {
47 bucket_list_t *next_list;
48
49 /* If we finished reading a bucket during the previous read, then
50 * we can now toss that bucket.
51 */
52 while (ctx->done != NULL) {
53 next_list = ctx->done->next;
54
55 if (ctx->bucket_owner) {
56 serf_bucket_destroy(ctx->done->bucket);
57 }
58 serf_bucket_mem_free(allocator, ctx->done);
59
60 ctx->done = next_list;
61 }
62 }
63
serf_bucket_aggregate_cleanup(serf_bucket_t * bucket,serf_bucket_alloc_t * allocator)64 void serf_bucket_aggregate_cleanup(
65 serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
66 {
67 aggregate_context_t *ctx = bucket->data;
68
69 cleanup_aggregate(ctx, allocator);
70 }
71
create_aggregate(serf_bucket_alloc_t * allocator)72 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
73 {
74 aggregate_context_t *ctx;
75
76 ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
77
78 ctx->list = NULL;
79 ctx->last = NULL;
80 ctx->done = NULL;
81 ctx->hold_open = NULL;
82 ctx->hold_open_baton = NULL;
83 ctx->bucket_owner = 1;
84
85 return ctx;
86 }
87
serf_bucket_aggregate_create(serf_bucket_alloc_t * allocator)88 serf_bucket_t *serf_bucket_aggregate_create(
89 serf_bucket_alloc_t *allocator)
90 {
91 aggregate_context_t *ctx;
92
93 ctx = create_aggregate(allocator);
94
95 return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
96 }
97
serf__bucket_stream_create(serf_bucket_alloc_t * allocator,serf_bucket_aggregate_eof_t fn,void * baton)98 serf_bucket_t *serf__bucket_stream_create(
99 serf_bucket_alloc_t *allocator,
100 serf_bucket_aggregate_eof_t fn,
101 void *baton)
102 {
103 serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
104 aggregate_context_t *ctx = bucket->data;
105
106 serf_bucket_aggregate_hold_open(bucket, fn, baton);
107
108 ctx->bucket_owner = 0;
109
110 return bucket;
111 }
112
113
serf_aggregate_destroy_and_data(serf_bucket_t * bucket)114 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
115 {
116 aggregate_context_t *ctx = bucket->data;
117 bucket_list_t *next_ctx;
118
119 while (ctx->list) {
120 if (ctx->bucket_owner) {
121 serf_bucket_destroy(ctx->list->bucket);
122 }
123 next_ctx = ctx->list->next;
124 serf_bucket_mem_free(bucket->allocator, ctx->list);
125 ctx->list = next_ctx;
126 }
127 cleanup_aggregate(ctx, bucket->allocator);
128
129 serf_default_destroy_and_data(bucket);
130 }
131
serf_bucket_aggregate_become(serf_bucket_t * bucket)132 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
133 {
134 aggregate_context_t *ctx;
135
136 ctx = create_aggregate(bucket->allocator);
137
138 bucket->type = &serf_bucket_type_aggregate;
139 bucket->data = ctx;
140
141 /* The allocator remains the same. */
142 }
143
144
serf_bucket_aggregate_prepend(serf_bucket_t * aggregate_bucket,serf_bucket_t * prepend_bucket)145 void serf_bucket_aggregate_prepend(
146 serf_bucket_t *aggregate_bucket,
147 serf_bucket_t *prepend_bucket)
148 {
149 aggregate_context_t *ctx = aggregate_bucket->data;
150 bucket_list_t *new_list;
151
152 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
153 sizeof(*new_list));
154 new_list->bucket = prepend_bucket;
155 new_list->next = ctx->list;
156
157 ctx->list = new_list;
158 }
159
serf_bucket_aggregate_append(serf_bucket_t * aggregate_bucket,serf_bucket_t * append_bucket)160 void serf_bucket_aggregate_append(
161 serf_bucket_t *aggregate_bucket,
162 serf_bucket_t *append_bucket)
163 {
164 aggregate_context_t *ctx = aggregate_bucket->data;
165 bucket_list_t *new_list;
166
167 new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
168 sizeof(*new_list));
169 new_list->bucket = append_bucket;
170 new_list->next = NULL;
171
172 /* If we use APR_RING, this is trivial. So, wait.
173 new_list->next = ctx->list;
174 ctx->list = new_list;
175 */
176 if (ctx->list == NULL) {
177 ctx->list = new_list;
178 ctx->last = new_list;
179 }
180 else {
181 ctx->last->next = new_list;
182 ctx->last = ctx->last->next;
183 }
184 }
185
serf_bucket_aggregate_hold_open(serf_bucket_t * aggregate_bucket,serf_bucket_aggregate_eof_t fn,void * baton)186 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
187 serf_bucket_aggregate_eof_t fn,
188 void *baton)
189 {
190 aggregate_context_t *ctx = aggregate_bucket->data;
191 ctx->hold_open = fn;
192 ctx->hold_open_baton = baton;
193 }
194
serf_bucket_aggregate_prepend_iovec(serf_bucket_t * aggregate_bucket,struct iovec * vecs,int vecs_count)195 void serf_bucket_aggregate_prepend_iovec(
196 serf_bucket_t *aggregate_bucket,
197 struct iovec *vecs,
198 int vecs_count)
199 {
200 int i;
201
202 /* Add in reverse order. */
203 for (i = vecs_count - 1; i >= 0; i--) {
204 serf_bucket_t *new_bucket;
205
206 new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
207 vecs[i].iov_len,
208 NULL, NULL,
209 aggregate_bucket->allocator);
210
211 serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
212
213 }
214 }
215
serf_bucket_aggregate_append_iovec(serf_bucket_t * aggregate_bucket,struct iovec * vecs,int vecs_count)216 void serf_bucket_aggregate_append_iovec(
217 serf_bucket_t *aggregate_bucket,
218 struct iovec *vecs,
219 int vecs_count)
220 {
221 serf_bucket_t *new_bucket;
222
223 new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
224 aggregate_bucket->allocator);
225
226 serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
227 }
228
read_aggregate(serf_bucket_t * bucket,apr_size_t requested,int vecs_size,struct iovec * vecs,int * vecs_used)229 static apr_status_t read_aggregate(serf_bucket_t *bucket,
230 apr_size_t requested,
231 int vecs_size, struct iovec *vecs,
232 int *vecs_used)
233 {
234 aggregate_context_t *ctx = bucket->data;
235 int cur_vecs_used;
236 apr_status_t status;
237
238 *vecs_used = 0;
239
240 if (!ctx->list) {
241 if (ctx->hold_open) {
242 return ctx->hold_open(ctx->hold_open_baton, bucket);
243 }
244 else {
245 return APR_EOF;
246 }
247 }
248
249 status = APR_SUCCESS;
250 while (requested) {
251 serf_bucket_t *head = ctx->list->bucket;
252
253 status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
254 &cur_vecs_used);
255
256 if (SERF_BUCKET_READ_ERROR(status))
257 return status;
258
259 /* Add the number of vecs we read to our running total. */
260 *vecs_used += cur_vecs_used;
261
262 if (cur_vecs_used > 0 || status) {
263 bucket_list_t *next_list;
264
265 /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
266 * as it isn't safe to read more without returning to our caller.
267 */
268 if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
269 return status;
270 }
271
272 /* However, if we read EOF, we can stash this bucket in a
273 * to-be-freed list and move on to the next bucket. This ensures
274 * that the bucket stays alive (so as not to violate our read
275 * semantics). We'll destroy this list of buckets the next time
276 * we are asked to perform a read operation - thus ensuring the
277 * proper read lifetime.
278 */
279 next_list = ctx->list->next;
280 ctx->list->next = ctx->done;
281 ctx->done = ctx->list;
282 ctx->list = next_list;
283
284 /* If we have no more in our list, return EOF. */
285 if (!ctx->list) {
286 if (ctx->hold_open) {
287 return ctx->hold_open(ctx->hold_open_baton, bucket);
288 }
289 else {
290 return APR_EOF;
291 }
292 }
293
294 /* At this point, it safe to read the next bucket - if we can. */
295
296 /* If the caller doesn't want ALL_AVAIL, decrement the size
297 * of the items we just read from the list.
298 */
299 if (requested != SERF_READ_ALL_AVAIL) {
300 int i;
301
302 for (i = 0; i < cur_vecs_used; i++)
303 requested -= vecs[i].iov_len;
304 }
305
306 /* Adjust our vecs to account for what we just read. */
307 vecs_size -= cur_vecs_used;
308 vecs += cur_vecs_used;
309
310 /* We reached our max. Oh well. */
311 if (!requested || !vecs_size) {
312 return APR_SUCCESS;
313 }
314 }
315 }
316
317 return status;
318 }
319
serf_aggregate_read(serf_bucket_t * bucket,apr_size_t requested,const char ** data,apr_size_t * len)320 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
321 apr_size_t requested,
322 const char **data, apr_size_t *len)
323 {
324 aggregate_context_t *ctx = bucket->data;
325 struct iovec vec;
326 int vecs_used;
327 apr_status_t status;
328
329 cleanup_aggregate(ctx, bucket->allocator);
330
331 status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
332
333 if (!vecs_used) {
334 *len = 0;
335 }
336 else {
337 *data = vec.iov_base;
338 *len = vec.iov_len;
339 }
340
341 return status;
342 }
343
serf_aggregate_read_iovec(serf_bucket_t * bucket,apr_size_t requested,int vecs_size,struct iovec * vecs,int * vecs_used)344 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
345 apr_size_t requested,
346 int vecs_size,
347 struct iovec *vecs,
348 int *vecs_used)
349 {
350 aggregate_context_t *ctx = bucket->data;
351
352 cleanup_aggregate(ctx, bucket->allocator);
353
354 return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
355 }
356
serf_aggregate_readline(serf_bucket_t * bucket,int acceptable,int * found,const char ** data,apr_size_t * len)357 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
358 int acceptable, int *found,
359 const char **data, apr_size_t *len)
360 {
361 aggregate_context_t *ctx = bucket->data;
362 apr_status_t status;
363
364 cleanup_aggregate(ctx, bucket->allocator);
365
366 do {
367 serf_bucket_t *head;
368
369 *len = 0;
370
371 if (!ctx->list) {
372 if (ctx->hold_open) {
373 return ctx->hold_open(ctx->hold_open_baton, bucket);
374 }
375 else {
376 return APR_EOF;
377 }
378 }
379
380 head = ctx->list->bucket;
381
382 status = serf_bucket_readline(head, acceptable, found,
383 data, len);
384 if (SERF_BUCKET_READ_ERROR(status))
385 return status;
386
387 if (status == APR_EOF) {
388 bucket_list_t *next_list;
389
390 /* head bucket is empty, move to to-be-cleaned-up list. */
391 next_list = ctx->list->next;
392 ctx->list->next = ctx->done;
393 ctx->done = ctx->list;
394 ctx->list = next_list;
395
396 /* If we have no more in our list, return EOF. */
397 if (!ctx->list) {
398 if (ctx->hold_open) {
399 return ctx->hold_open(ctx->hold_open_baton, bucket);
400 }
401 else {
402 return APR_EOF;
403 }
404 }
405
406 /* we read something, so bail out and let the appl. read again. */
407 if (*len)
408 status = APR_SUCCESS;
409 }
410
411 /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
412 } while (!*len && status != APR_EAGAIN);
413
414 return status;
415 }
416
serf_aggregate_peek(serf_bucket_t * bucket,const char ** data,apr_size_t * len)417 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
418 const char **data,
419 apr_size_t *len)
420 {
421 aggregate_context_t *ctx = bucket->data;
422 serf_bucket_t *head;
423 apr_status_t status;
424
425 cleanup_aggregate(ctx, bucket->allocator);
426
427 /* Peek the first bucket in the list, if any. */
428 if (!ctx->list) {
429 *len = 0;
430 if (ctx->hold_open) {
431 status = ctx->hold_open(ctx->hold_open_baton, bucket);
432 if (status == APR_EAGAIN)
433 status = APR_SUCCESS;
434 return status;
435 }
436 else {
437 return APR_EOF;
438 }
439 }
440
441 head = ctx->list->bucket;
442
443 status = serf_bucket_peek(head, data, len);
444
445 if (status == APR_EOF) {
446 if (ctx->list->next) {
447 status = APR_SUCCESS;
448 } else {
449 if (ctx->hold_open) {
450 status = ctx->hold_open(ctx->hold_open_baton, bucket);
451 if (status == APR_EAGAIN)
452 status = APR_SUCCESS;
453 return status;
454 }
455 }
456 }
457
458 return status;
459 }
460
serf_aggregate_read_bucket(serf_bucket_t * bucket,const serf_bucket_type_t * type)461 static serf_bucket_t * serf_aggregate_read_bucket(
462 serf_bucket_t *bucket,
463 const serf_bucket_type_t *type)
464 {
465 aggregate_context_t *ctx = bucket->data;
466 serf_bucket_t *found_bucket;
467
468 if (!ctx->list) {
469 return NULL;
470 }
471
472 if (ctx->list->bucket->type == type) {
473 /* Got the bucket. Consume it from our list. */
474 found_bucket = ctx->list->bucket;
475 ctx->list = ctx->list->next;
476 return found_bucket;
477 }
478
479 /* Call read_bucket on first one in our list. */
480 return serf_bucket_read_bucket(ctx->list->bucket, type);
481 }
482
483
484 const serf_bucket_type_t serf_bucket_type_aggregate = {
485 "AGGREGATE",
486 serf_aggregate_read,
487 serf_aggregate_readline,
488 serf_aggregate_read_iovec,
489 serf_default_read_for_sendfile,
490 serf_aggregate_read_bucket,
491 serf_aggregate_peek,
492 serf_aggregate_destroy_and_data,
493 };
494