1 /* ====================================================================
2  *    Licensed to the Apache Software Foundation (ASF) under one
3  *    or more contributor license agreements.  See the NOTICE file
4  *    distributed with this work for additional information
5  *    regarding copyright ownership.  The ASF licenses this file
6  *    to you under the Apache License, Version 2.0 (the
7  *    "License"); you may not use this file except in compliance
8  *    with the License.  You may obtain a copy of the License at
9  *
10  *      http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *    Unless required by applicable law or agreed to in writing,
13  *    software distributed under the License is distributed on an
14  *    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  *    KIND, either express or implied.  See the License for the
16  *    specific language governing permissions and limitations
17  *    under the License.
18  * ====================================================================
19  */
20 
21 #include "serf.h"
22 #include "serf_bucket_util.h"
23 
24 
25 /* Should be an APR_RING? */
26 typedef struct bucket_list {
27     serf_bucket_t *bucket;
28     struct bucket_list *next;
29 } bucket_list_t;
30 
31 typedef struct {
32     bucket_list_t *list; /* active buckets */
33     bucket_list_t *last; /* last bucket of the list */
34     bucket_list_t *done; /* we finished reading this; now pending a destroy */
35 
36     serf_bucket_aggregate_eof_t hold_open;
37     void *hold_open_baton;
38 
39     /* Does this bucket own its children? !0 if yes, 0 if not. */
40     int bucket_owner;
41 } aggregate_context_t;
42 
43 
cleanup_aggregate(aggregate_context_t * ctx,serf_bucket_alloc_t * allocator)44 static void cleanup_aggregate(aggregate_context_t *ctx,
45                               serf_bucket_alloc_t *allocator)
46 {
47     bucket_list_t *next_list;
48 
49     /* If we finished reading a bucket during the previous read, then
50      * we can now toss that bucket.
51      */
52     while (ctx->done != NULL) {
53         next_list = ctx->done->next;
54 
55         if (ctx->bucket_owner) {
56             serf_bucket_destroy(ctx->done->bucket);
57         }
58         serf_bucket_mem_free(allocator, ctx->done);
59 
60         ctx->done = next_list;
61     }
62 }
63 
serf_bucket_aggregate_cleanup(serf_bucket_t * bucket,serf_bucket_alloc_t * allocator)64 void serf_bucket_aggregate_cleanup(
65     serf_bucket_t *bucket, serf_bucket_alloc_t *allocator)
66 {
67     aggregate_context_t *ctx = bucket->data;
68 
69     cleanup_aggregate(ctx, allocator);
70 }
71 
create_aggregate(serf_bucket_alloc_t * allocator)72 static aggregate_context_t *create_aggregate(serf_bucket_alloc_t *allocator)
73 {
74     aggregate_context_t *ctx;
75 
76     ctx = serf_bucket_mem_alloc(allocator, sizeof(*ctx));
77 
78     ctx->list = NULL;
79     ctx->last = NULL;
80     ctx->done = NULL;
81     ctx->hold_open = NULL;
82     ctx->hold_open_baton = NULL;
83     ctx->bucket_owner = 1;
84 
85     return ctx;
86 }
87 
serf_bucket_aggregate_create(serf_bucket_alloc_t * allocator)88 serf_bucket_t *serf_bucket_aggregate_create(
89     serf_bucket_alloc_t *allocator)
90 {
91     aggregate_context_t *ctx;
92 
93     ctx = create_aggregate(allocator);
94 
95     return serf_bucket_create(&serf_bucket_type_aggregate, allocator, ctx);
96 }
97 
serf__bucket_stream_create(serf_bucket_alloc_t * allocator,serf_bucket_aggregate_eof_t fn,void * baton)98 serf_bucket_t *serf__bucket_stream_create(
99     serf_bucket_alloc_t *allocator,
100     serf_bucket_aggregate_eof_t fn,
101     void *baton)
102 {
103     serf_bucket_t *bucket = serf_bucket_aggregate_create(allocator);
104     aggregate_context_t *ctx = bucket->data;
105 
106     serf_bucket_aggregate_hold_open(bucket, fn, baton);
107 
108     ctx->bucket_owner = 0;
109 
110     return bucket;
111 }
112 
113 
serf_aggregate_destroy_and_data(serf_bucket_t * bucket)114 static void serf_aggregate_destroy_and_data(serf_bucket_t *bucket)
115 {
116     aggregate_context_t *ctx = bucket->data;
117     bucket_list_t *next_ctx;
118 
119     while (ctx->list) {
120         if (ctx->bucket_owner) {
121             serf_bucket_destroy(ctx->list->bucket);
122         }
123         next_ctx = ctx->list->next;
124         serf_bucket_mem_free(bucket->allocator, ctx->list);
125         ctx->list = next_ctx;
126     }
127     cleanup_aggregate(ctx, bucket->allocator);
128 
129     serf_default_destroy_and_data(bucket);
130 }
131 
serf_bucket_aggregate_become(serf_bucket_t * bucket)132 void serf_bucket_aggregate_become(serf_bucket_t *bucket)
133 {
134     aggregate_context_t *ctx;
135 
136     ctx = create_aggregate(bucket->allocator);
137 
138     bucket->type = &serf_bucket_type_aggregate;
139     bucket->data = ctx;
140 
141     /* The allocator remains the same. */
142 }
143 
144 
serf_bucket_aggregate_prepend(serf_bucket_t * aggregate_bucket,serf_bucket_t * prepend_bucket)145 void serf_bucket_aggregate_prepend(
146     serf_bucket_t *aggregate_bucket,
147     serf_bucket_t *prepend_bucket)
148 {
149     aggregate_context_t *ctx = aggregate_bucket->data;
150     bucket_list_t *new_list;
151 
152     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
153                                      sizeof(*new_list));
154     new_list->bucket = prepend_bucket;
155     new_list->next = ctx->list;
156 
157     ctx->list = new_list;
158 }
159 
serf_bucket_aggregate_append(serf_bucket_t * aggregate_bucket,serf_bucket_t * append_bucket)160 void serf_bucket_aggregate_append(
161     serf_bucket_t *aggregate_bucket,
162     serf_bucket_t *append_bucket)
163 {
164     aggregate_context_t *ctx = aggregate_bucket->data;
165     bucket_list_t *new_list;
166 
167     new_list = serf_bucket_mem_alloc(aggregate_bucket->allocator,
168                                      sizeof(*new_list));
169     new_list->bucket = append_bucket;
170     new_list->next = NULL;
171 
172     /* If we use APR_RING, this is trivial.  So, wait.
173     new_list->next = ctx->list;
174     ctx->list = new_list;
175     */
176     if (ctx->list == NULL) {
177         ctx->list = new_list;
178         ctx->last = new_list;
179     }
180     else {
181         ctx->last->next = new_list;
182         ctx->last = ctx->last->next;
183     }
184 }
185 
serf_bucket_aggregate_hold_open(serf_bucket_t * aggregate_bucket,serf_bucket_aggregate_eof_t fn,void * baton)186 void serf_bucket_aggregate_hold_open(serf_bucket_t *aggregate_bucket,
187                                      serf_bucket_aggregate_eof_t fn,
188                                      void *baton)
189 {
190     aggregate_context_t *ctx = aggregate_bucket->data;
191     ctx->hold_open = fn;
192     ctx->hold_open_baton = baton;
193 }
194 
serf_bucket_aggregate_prepend_iovec(serf_bucket_t * aggregate_bucket,struct iovec * vecs,int vecs_count)195 void serf_bucket_aggregate_prepend_iovec(
196     serf_bucket_t *aggregate_bucket,
197     struct iovec *vecs,
198     int vecs_count)
199 {
200     int i;
201 
202     /* Add in reverse order. */
203     for (i = vecs_count - 1; i >= 0; i--) {
204         serf_bucket_t *new_bucket;
205 
206         new_bucket = serf_bucket_simple_create(vecs[i].iov_base,
207                                                vecs[i].iov_len,
208                                                NULL, NULL,
209                                                aggregate_bucket->allocator);
210 
211         serf_bucket_aggregate_prepend(aggregate_bucket, new_bucket);
212 
213     }
214 }
215 
serf_bucket_aggregate_append_iovec(serf_bucket_t * aggregate_bucket,struct iovec * vecs,int vecs_count)216 void serf_bucket_aggregate_append_iovec(
217     serf_bucket_t *aggregate_bucket,
218     struct iovec *vecs,
219     int vecs_count)
220 {
221     serf_bucket_t *new_bucket;
222 
223     new_bucket = serf_bucket_iovec_create(vecs, vecs_count,
224                                           aggregate_bucket->allocator);
225 
226     serf_bucket_aggregate_append(aggregate_bucket, new_bucket);
227 }
228 
read_aggregate(serf_bucket_t * bucket,apr_size_t requested,int vecs_size,struct iovec * vecs,int * vecs_used)229 static apr_status_t read_aggregate(serf_bucket_t *bucket,
230                                    apr_size_t requested,
231                                    int vecs_size, struct iovec *vecs,
232                                    int *vecs_used)
233 {
234     aggregate_context_t *ctx = bucket->data;
235     int cur_vecs_used;
236     apr_status_t status;
237 
238     *vecs_used = 0;
239 
240     if (!ctx->list) {
241         if (ctx->hold_open) {
242             return ctx->hold_open(ctx->hold_open_baton, bucket);
243         }
244         else {
245             return APR_EOF;
246         }
247     }
248 
249     status = APR_SUCCESS;
250     while (requested) {
251         serf_bucket_t *head = ctx->list->bucket;
252 
253         status = serf_bucket_read_iovec(head, requested, vecs_size, vecs,
254                                         &cur_vecs_used);
255 
256         if (SERF_BUCKET_READ_ERROR(status))
257             return status;
258 
259         /* Add the number of vecs we read to our running total. */
260         *vecs_used += cur_vecs_used;
261 
262         if (cur_vecs_used > 0 || status) {
263             bucket_list_t *next_list;
264 
265             /* If we got SUCCESS (w/bytes) or EAGAIN, we want to return now
266              * as it isn't safe to read more without returning to our caller.
267              */
268             if (!status || APR_STATUS_IS_EAGAIN(status) || status == SERF_ERROR_WAIT_CONN) {
269                 return status;
270             }
271 
272             /* However, if we read EOF, we can stash this bucket in a
273              * to-be-freed list and move on to the next bucket.  This ensures
274              * that the bucket stays alive (so as not to violate our read
275              * semantics).  We'll destroy this list of buckets the next time
276              * we are asked to perform a read operation - thus ensuring the
277              * proper read lifetime.
278              */
279             next_list = ctx->list->next;
280             ctx->list->next = ctx->done;
281             ctx->done = ctx->list;
282             ctx->list = next_list;
283 
284             /* If we have no more in our list, return EOF. */
285             if (!ctx->list) {
286                 if (ctx->hold_open) {
287                     return ctx->hold_open(ctx->hold_open_baton, bucket);
288                 }
289                 else {
290                     return APR_EOF;
291                 }
292             }
293 
294             /* At this point, it safe to read the next bucket - if we can. */
295 
296             /* If the caller doesn't want ALL_AVAIL, decrement the size
297              * of the items we just read from the list.
298              */
299             if (requested != SERF_READ_ALL_AVAIL) {
300                 int i;
301 
302                 for (i = 0; i < cur_vecs_used; i++)
303                     requested -= vecs[i].iov_len;
304             }
305 
306             /* Adjust our vecs to account for what we just read. */
307             vecs_size -= cur_vecs_used;
308             vecs += cur_vecs_used;
309 
310             /* We reached our max.  Oh well. */
311             if (!requested || !vecs_size) {
312                 return APR_SUCCESS;
313             }
314         }
315     }
316 
317     return status;
318 }
319 
serf_aggregate_read(serf_bucket_t * bucket,apr_size_t requested,const char ** data,apr_size_t * len)320 static apr_status_t serf_aggregate_read(serf_bucket_t *bucket,
321                                         apr_size_t requested,
322                                         const char **data, apr_size_t *len)
323 {
324     aggregate_context_t *ctx = bucket->data;
325     struct iovec vec;
326     int vecs_used;
327     apr_status_t status;
328 
329     cleanup_aggregate(ctx, bucket->allocator);
330 
331     status = read_aggregate(bucket, requested, 1, &vec, &vecs_used);
332 
333     if (!vecs_used) {
334         *len = 0;
335     }
336     else {
337         *data = vec.iov_base;
338         *len = vec.iov_len;
339     }
340 
341     return status;
342 }
343 
serf_aggregate_read_iovec(serf_bucket_t * bucket,apr_size_t requested,int vecs_size,struct iovec * vecs,int * vecs_used)344 static apr_status_t serf_aggregate_read_iovec(serf_bucket_t *bucket,
345                                               apr_size_t requested,
346                                               int vecs_size,
347                                               struct iovec *vecs,
348                                               int *vecs_used)
349 {
350     aggregate_context_t *ctx = bucket->data;
351 
352     cleanup_aggregate(ctx, bucket->allocator);
353 
354     return read_aggregate(bucket, requested, vecs_size, vecs, vecs_used);
355 }
356 
serf_aggregate_readline(serf_bucket_t * bucket,int acceptable,int * found,const char ** data,apr_size_t * len)357 static apr_status_t serf_aggregate_readline(serf_bucket_t *bucket,
358                                             int acceptable, int *found,
359                                             const char **data, apr_size_t *len)
360 {
361     aggregate_context_t *ctx = bucket->data;
362     apr_status_t status;
363 
364     cleanup_aggregate(ctx, bucket->allocator);
365 
366     do {
367         serf_bucket_t *head;
368 
369         *len = 0;
370 
371         if (!ctx->list) {
372             if (ctx->hold_open) {
373                 return ctx->hold_open(ctx->hold_open_baton, bucket);
374             }
375             else {
376                 return APR_EOF;
377             }
378         }
379 
380         head = ctx->list->bucket;
381 
382         status = serf_bucket_readline(head, acceptable, found,
383                                       data, len);
384         if (SERF_BUCKET_READ_ERROR(status))
385             return status;
386 
387         if (status == APR_EOF) {
388             bucket_list_t *next_list;
389 
390             /* head bucket is empty, move to to-be-cleaned-up list. */
391             next_list = ctx->list->next;
392             ctx->list->next = ctx->done;
393             ctx->done = ctx->list;
394             ctx->list = next_list;
395 
396             /* If we have no more in our list, return EOF. */
397             if (!ctx->list) {
398                 if (ctx->hold_open) {
399                     return ctx->hold_open(ctx->hold_open_baton, bucket);
400                 }
401                 else {
402                     return APR_EOF;
403                 }
404             }
405 
406             /* we read something, so bail out and let the appl. read again. */
407             if (*len)
408                 status = APR_SUCCESS;
409         }
410 
411         /* continue with APR_SUCCESS or APR_EOF and no data read yet. */
412     } while (!*len && status != APR_EAGAIN);
413 
414     return status;
415 }
416 
serf_aggregate_peek(serf_bucket_t * bucket,const char ** data,apr_size_t * len)417 static apr_status_t serf_aggregate_peek(serf_bucket_t *bucket,
418                                         const char **data,
419                                         apr_size_t *len)
420 {
421     aggregate_context_t *ctx = bucket->data;
422     serf_bucket_t *head;
423     apr_status_t status;
424 
425     cleanup_aggregate(ctx, bucket->allocator);
426 
427     /* Peek the first bucket in the list, if any. */
428     if (!ctx->list) {
429         *len = 0;
430         if (ctx->hold_open) {
431             status = ctx->hold_open(ctx->hold_open_baton, bucket);
432             if (status == APR_EAGAIN)
433                 status = APR_SUCCESS;
434             return status;
435         }
436         else {
437             return APR_EOF;
438         }
439     }
440 
441     head = ctx->list->bucket;
442 
443     status = serf_bucket_peek(head, data, len);
444 
445     if (status == APR_EOF) {
446         if (ctx->list->next) {
447             status = APR_SUCCESS;
448         } else {
449             if (ctx->hold_open) {
450                 status = ctx->hold_open(ctx->hold_open_baton, bucket);
451                 if (status == APR_EAGAIN)
452                     status = APR_SUCCESS;
453                 return status;
454             }
455         }
456     }
457 
458     return status;
459 }
460 
serf_aggregate_read_bucket(serf_bucket_t * bucket,const serf_bucket_type_t * type)461 static serf_bucket_t * serf_aggregate_read_bucket(
462     serf_bucket_t *bucket,
463     const serf_bucket_type_t *type)
464 {
465     aggregate_context_t *ctx = bucket->data;
466     serf_bucket_t *found_bucket;
467 
468     if (!ctx->list) {
469         return NULL;
470     }
471 
472     if (ctx->list->bucket->type == type) {
473         /* Got the bucket. Consume it from our list. */
474         found_bucket = ctx->list->bucket;
475         ctx->list = ctx->list->next;
476         return found_bucket;
477     }
478 
479     /* Call read_bucket on first one in our list. */
480     return serf_bucket_read_bucket(ctx->list->bucket, type);
481 }
482 
483 
484 const serf_bucket_type_t serf_bucket_type_aggregate = {
485     "AGGREGATE",
486     serf_aggregate_read,
487     serf_aggregate_readline,
488     serf_aggregate_read_iovec,
489     serf_default_read_for_sendfile,
490     serf_aggregate_read_bucket,
491     serf_aggregate_peek,
492     serf_aggregate_destroy_and_data,
493 };
494