1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11     size_t *copied);
12 static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
13     nxt_work_queue_t *wq, nxt_buf_t *start);
14 
15 
16 nxt_uint_t
nxt_sendbuf_mem_coalesce0(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov_max)17 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
18     struct iovec *iov, nxt_uint_t niov_max)
19 {
20     u_char      *last;
21     size_t      size, total;
22     nxt_buf_t   *b;
23     nxt_uint_t  n;
24 
25     total = sb->size;
26     last = NULL;
27     n = (nxt_uint_t) -1;
28 
29     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
30 
31         nxt_prefetch(b->next);
32 
33         if (nxt_buf_is_file(b)) {
34             break;
35         }
36 
37         if (nxt_buf_is_mem(b)) {
38 
39             size = b->mem.free - b->mem.pos;
40 
41             if (size != 0) {
42 
43                 if (total + size > sb->limit) {
44                     size = sb->limit - total;
45 
46                     if (size == 0) {
47                         break;
48                     }
49                 }
50 
51                 if (b->mem.pos != last) {
52 
53                     if (++n >= niov_max) {
54                         goto done;
55                     }
56 
57                     iov[n].iov_base = b->mem.pos;
58                     iov[n].iov_len = size;
59 
60                 } else {
61                     iov[n].iov_len += size;
62                 }
63 
64                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
65                           n, iov[n].iov_base, iov[n].iov_len);
66 
67                 total += size;
68                 last = b->mem.pos + size;
69             }
70 
71         } else {
72             sb->sync = 1;
73             sb->last |= nxt_buf_is_last(b);
74         }
75     }
76 
77     n++;
78 
79 done:
80 
81     sb->buf = b;
82 
83     return n;
84 }
85 
86 
87 nxt_uint_t
nxt_sendbuf_mem_coalesce(nxt_task_t * task,nxt_sendbuf_coalesce_t * sb)88 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
89 {
90     u_char      *last;
91     size_t      size, total;
92     nxt_buf_t   *b;
93     nxt_uint_t  n;
94 
95     total = sb->size;
96     last = NULL;
97     n = (nxt_uint_t) -1;
98 
99     for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
100 
101         nxt_prefetch(b->next);
102 
103         if (nxt_buf_is_file(b)) {
104             break;
105         }
106 
107         if (nxt_buf_is_mem(b)) {
108 
109             size = b->mem.free - b->mem.pos;
110 
111             if (size != 0) {
112 
113                 if (total + size > sb->limit) {
114                     size = sb->limit - total;
115 
116                     sb->limit_reached = 1;
117 
118                     if (nxt_slow_path(size == 0)) {
119                         break;
120                     }
121                 }
122 
123                 if (b->mem.pos != last) {
124 
125                     if (++n >= sb->nmax) {
126                         sb->nmax_reached = 1;
127 
128                         goto done;
129                     }
130 
131                     sb->iobuf[n].iov_base = b->mem.pos;
132                     sb->iobuf[n].iov_len = size;
133 
134                 } else {
135                     sb->iobuf[n].iov_len += size;
136                 }
137 
138                 nxt_debug(task, "sendbuf: %ui, %p, %uz",
139                           n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
140 
141                 total += size;
142                 last = b->mem.pos + size;
143             }
144 
145         } else {
146             sb->sync = 1;
147             sb->last |= nxt_buf_is_last(b);
148         }
149     }
150 
151     n++;
152 
153 done:
154 
155     sb->buf = b;
156     sb->size = total;
157     sb->niov = n;
158 
159     return n;
160 }
161 
162 
163 size_t
nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t * sb)164 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
165 {
166     size_t     file_start, total;
167     nxt_fd_t   fd;
168     nxt_off_t  size, last;
169     nxt_buf_t  *b;
170 
171     b = sb->buf;
172     fd = b->file->fd;
173 
174     total = sb->size;
175 
176     for ( ;; ) {
177 
178         nxt_prefetch(b->next);
179 
180         size = b->file_end - b->file_pos;
181 
182         if (total + size >= sb->limit) {
183             total = sb->limit;
184             break;
185         }
186 
187         total += size;
188         last = b->file_pos + size;
189 
190         b = b->next;
191 
192         if (b == NULL || !nxt_buf_is_file(b)) {
193             break;
194         }
195 
196         if (b->file_pos != last || b->file->fd != fd) {
197             break;
198         }
199     }
200 
201     sb->buf = b;
202 
203     file_start = sb->size;
204     sb->size = total;
205 
206     return total - file_start;
207 }
208 
209 
210 ssize_t
nxt_sendbuf_copy_coalesce(nxt_conn_t * c,nxt_buf_mem_t * bm,nxt_buf_t * b,size_t limit)211 nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
212     size_t limit)
213 {
214     size_t      size, bsize, copied;
215     ssize_t     n;
216     nxt_bool_t  flush;
217 
218     size = nxt_buf_mem_used_size(&b->mem);
219     bsize = nxt_buf_mem_size(bm);
220 
221     if (bsize != 0) {
222 
223         if (size > bsize && bm->pos == bm->free) {
224             /*
225              * A data buffer size is larger than the internal
226              * buffer size and the internal buffer is empty.
227              */
228             goto no_buffer;
229         }
230 
231         if (bm->pos == NULL) {
232             bm->pos = nxt_malloc(bsize);
233             if (nxt_slow_path(bm->pos == NULL)) {
234                 return NXT_ERROR;
235             }
236 
237             bm->start = bm->pos;
238             bm->free = bm->pos;
239             bm->end += (uintptr_t) bm->pos;
240         }
241 
242         copied = 0;
243 
244         flush = nxt_sendbuf_copy(bm, b, &copied);
245 
246         nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
247 
248         if (flush == 0) {
249             return copied;
250         }
251 
252         size = nxt_buf_mem_used_size(bm);
253 
254         if (size == 0 && nxt_buf_is_sync(b)) {
255             goto done;
256         }
257 
258         n = c->io->send(c, bm->pos, nxt_min(size, limit));
259 
260         nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
261 
262         if (n > 0) {
263             bm->pos += n;
264 
265             if (bm->pos == bm->free) {
266                 bm->pos = bm->start;
267                 bm->free = bm->start;
268             }
269 
270             n = 0;
271         }
272 
273         return (copied != 0) ? (ssize_t) copied : n;
274     }
275 
276     /* No internal buffering. */
277 
278     if (size == 0 && nxt_buf_is_sync(b)) {
279         goto done;
280     }
281 
282 no_buffer:
283 
284     return c->io->send(c, b->mem.pos, nxt_min(size, limit));
285 
286 done:
287 
288     nxt_log_debug(c->socket.log, "sendbuf done");
289 
290     return 0;
291 }
292 
293 
294 static nxt_bool_t
nxt_sendbuf_copy(nxt_buf_mem_t * bm,nxt_buf_t * b,size_t * copied)295 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
296 {
297     size_t      size, bsize;
298     nxt_bool_t  flush;
299 
300     flush = 0;
301 
302     do {
303         nxt_prefetch(b->next);
304 
305         if (nxt_buf_is_mem(b)) {
306             bsize = bm->end - bm->free;
307             size = b->mem.free - b->mem.pos;
308             size = nxt_min(size, bsize);
309 
310             nxt_memcpy(bm->free, b->mem.pos, size);
311 
312             *copied += size;
313             bm->free += size;
314 
315             if (bm->free == bm->end) {
316                 return 1;
317             }
318         }
319 
320         flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
321 
322         b = b->next;
323 
324     } while (b != NULL);
325 
326     return flush;
327 }
328 
329 
330 nxt_buf_t *
nxt_sendbuf_update(nxt_buf_t * b,size_t sent)331 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
332 {
333     size_t  size;
334 
335     while (b != NULL) {
336 
337         nxt_prefetch(b->next);
338 
339         if (!nxt_buf_is_sync(b)) {
340 
341             size = nxt_buf_used_size(b);
342 
343             if (size != 0) {
344 
345                 if (sent == 0) {
346                     break;
347                 }
348 
349                 if (sent < size) {
350 
351                     if (nxt_buf_is_mem(b)) {
352                         b->mem.pos += sent;
353                     }
354 
355                     if (nxt_buf_is_file(b)) {
356                         b->file_pos += sent;
357                     }
358 
359                     break;
360                 }
361 
362                 /* b->mem.free is NULL in file-only buffer. */
363                 b->mem.pos = b->mem.free;
364 
365                 if (nxt_buf_is_file(b)) {
366                     b->file_pos = b->file_end;
367                 }
368 
369                 sent -= size;
370             }
371         }
372 
373         b = b->next;
374     }
375 
376     return b;
377 }
378 
379 
380 nxt_buf_t *
nxt_sendbuf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b)381 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
382 {
383     while (b != NULL) {
384 
385         if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
386             break;
387         }
388 
389         b = nxt_sendbuf_coalesce_completion(task, wq, b);
390     }
391 
392     return b;
393 }
394 
395 
396 void
nxt_sendbuf_drain(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b)397 nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
398 {
399     while (b != NULL) {
400         b = nxt_sendbuf_coalesce_completion(task, wq, b);
401     }
402 }
403 
404 
405 static nxt_buf_t *
nxt_sendbuf_coalesce_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * start)406 nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
407     nxt_buf_t *start)
408 {
409     nxt_buf_t           *b, *next, **last, *rest, **last_rest;
410     nxt_work_handler_t  handler;
411 
412     rest = NULL;
413     last_rest = &rest;
414     last = &start->next;
415     b = start;
416     handler = b->completion_handler;
417 
418     for ( ;; ) {
419         next = b->next;
420         if (next == NULL) {
421             break;
422         }
423 
424         b->next = NULL;
425         b = next;
426 
427         if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
428             *last_rest = b;
429             break;
430         }
431 
432         if (handler == b->completion_handler) {
433             *last = b;
434             last = &b->next;
435 
436         } else {
437             *last_rest = b;
438             last_rest = &b->next;
439         }
440     }
441 
442     nxt_work_queue_add(wq, handler, task, start, start->parent);
443 
444     return rest;
445 }
446