1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static nxt_bool_t nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b,
11 size_t *copied);
12 static nxt_buf_t *nxt_sendbuf_coalesce_completion(nxt_task_t *task,
13 nxt_work_queue_t *wq, nxt_buf_t *start);
14
15
16 nxt_uint_t
nxt_sendbuf_mem_coalesce0(nxt_task_t * task,nxt_sendbuf_t * sb,struct iovec * iov,nxt_uint_t niov_max)17 nxt_sendbuf_mem_coalesce0(nxt_task_t *task, nxt_sendbuf_t *sb,
18 struct iovec *iov, nxt_uint_t niov_max)
19 {
20 u_char *last;
21 size_t size, total;
22 nxt_buf_t *b;
23 nxt_uint_t n;
24
25 total = sb->size;
26 last = NULL;
27 n = (nxt_uint_t) -1;
28
29 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
30
31 nxt_prefetch(b->next);
32
33 if (nxt_buf_is_file(b)) {
34 break;
35 }
36
37 if (nxt_buf_is_mem(b)) {
38
39 size = b->mem.free - b->mem.pos;
40
41 if (size != 0) {
42
43 if (total + size > sb->limit) {
44 size = sb->limit - total;
45
46 if (size == 0) {
47 break;
48 }
49 }
50
51 if (b->mem.pos != last) {
52
53 if (++n >= niov_max) {
54 goto done;
55 }
56
57 iov[n].iov_base = b->mem.pos;
58 iov[n].iov_len = size;
59
60 } else {
61 iov[n].iov_len += size;
62 }
63
64 nxt_debug(task, "sendbuf: %ui, %p, %uz",
65 n, iov[n].iov_base, iov[n].iov_len);
66
67 total += size;
68 last = b->mem.pos + size;
69 }
70
71 } else {
72 sb->sync = 1;
73 sb->last |= nxt_buf_is_last(b);
74 }
75 }
76
77 n++;
78
79 done:
80
81 sb->buf = b;
82
83 return n;
84 }
85
86
87 nxt_uint_t
nxt_sendbuf_mem_coalesce(nxt_task_t * task,nxt_sendbuf_coalesce_t * sb)88 nxt_sendbuf_mem_coalesce(nxt_task_t *task, nxt_sendbuf_coalesce_t *sb)
89 {
90 u_char *last;
91 size_t size, total;
92 nxt_buf_t *b;
93 nxt_uint_t n;
94
95 total = sb->size;
96 last = NULL;
97 n = (nxt_uint_t) -1;
98
99 for (b = sb->buf; b != NULL && total < sb->limit; b = b->next) {
100
101 nxt_prefetch(b->next);
102
103 if (nxt_buf_is_file(b)) {
104 break;
105 }
106
107 if (nxt_buf_is_mem(b)) {
108
109 size = b->mem.free - b->mem.pos;
110
111 if (size != 0) {
112
113 if (total + size > sb->limit) {
114 size = sb->limit - total;
115
116 sb->limit_reached = 1;
117
118 if (nxt_slow_path(size == 0)) {
119 break;
120 }
121 }
122
123 if (b->mem.pos != last) {
124
125 if (++n >= sb->nmax) {
126 sb->nmax_reached = 1;
127
128 goto done;
129 }
130
131 sb->iobuf[n].iov_base = b->mem.pos;
132 sb->iobuf[n].iov_len = size;
133
134 } else {
135 sb->iobuf[n].iov_len += size;
136 }
137
138 nxt_debug(task, "sendbuf: %ui, %p, %uz",
139 n, sb->iobuf[n].iov_base, sb->iobuf[n].iov_len);
140
141 total += size;
142 last = b->mem.pos + size;
143 }
144
145 } else {
146 sb->sync = 1;
147 sb->last |= nxt_buf_is_last(b);
148 }
149 }
150
151 n++;
152
153 done:
154
155 sb->buf = b;
156 sb->size = total;
157 sb->niov = n;
158
159 return n;
160 }
161
162
163 size_t
nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t * sb)164 nxt_sendbuf_file_coalesce(nxt_sendbuf_coalesce_t *sb)
165 {
166 size_t file_start, total;
167 nxt_fd_t fd;
168 nxt_off_t size, last;
169 nxt_buf_t *b;
170
171 b = sb->buf;
172 fd = b->file->fd;
173
174 total = sb->size;
175
176 for ( ;; ) {
177
178 nxt_prefetch(b->next);
179
180 size = b->file_end - b->file_pos;
181
182 if (total + size >= sb->limit) {
183 total = sb->limit;
184 break;
185 }
186
187 total += size;
188 last = b->file_pos + size;
189
190 b = b->next;
191
192 if (b == NULL || !nxt_buf_is_file(b)) {
193 break;
194 }
195
196 if (b->file_pos != last || b->file->fd != fd) {
197 break;
198 }
199 }
200
201 sb->buf = b;
202
203 file_start = sb->size;
204 sb->size = total;
205
206 return total - file_start;
207 }
208
209
210 ssize_t
nxt_sendbuf_copy_coalesce(nxt_conn_t * c,nxt_buf_mem_t * bm,nxt_buf_t * b,size_t limit)211 nxt_sendbuf_copy_coalesce(nxt_conn_t *c, nxt_buf_mem_t *bm, nxt_buf_t *b,
212 size_t limit)
213 {
214 size_t size, bsize, copied;
215 ssize_t n;
216 nxt_bool_t flush;
217
218 size = nxt_buf_mem_used_size(&b->mem);
219 bsize = nxt_buf_mem_size(bm);
220
221 if (bsize != 0) {
222
223 if (size > bsize && bm->pos == bm->free) {
224 /*
225 * A data buffer size is larger than the internal
226 * buffer size and the internal buffer is empty.
227 */
228 goto no_buffer;
229 }
230
231 if (bm->pos == NULL) {
232 bm->pos = nxt_malloc(bsize);
233 if (nxt_slow_path(bm->pos == NULL)) {
234 return NXT_ERROR;
235 }
236
237 bm->start = bm->pos;
238 bm->free = bm->pos;
239 bm->end += (uintptr_t) bm->pos;
240 }
241
242 copied = 0;
243
244 flush = nxt_sendbuf_copy(bm, b, &copied);
245
246 nxt_log_debug(c->socket.log, "sendbuf copy:%uz fl:%b", copied, flush);
247
248 if (flush == 0) {
249 return copied;
250 }
251
252 size = nxt_buf_mem_used_size(bm);
253
254 if (size == 0 && nxt_buf_is_sync(b)) {
255 goto done;
256 }
257
258 n = c->io->send(c, bm->pos, nxt_min(size, limit));
259
260 nxt_log_debug(c->socket.log, "sendbuf sent:%z", n);
261
262 if (n > 0) {
263 bm->pos += n;
264
265 if (bm->pos == bm->free) {
266 bm->pos = bm->start;
267 bm->free = bm->start;
268 }
269
270 n = 0;
271 }
272
273 return (copied != 0) ? (ssize_t) copied : n;
274 }
275
276 /* No internal buffering. */
277
278 if (size == 0 && nxt_buf_is_sync(b)) {
279 goto done;
280 }
281
282 no_buffer:
283
284 return c->io->send(c, b->mem.pos, nxt_min(size, limit));
285
286 done:
287
288 nxt_log_debug(c->socket.log, "sendbuf done");
289
290 return 0;
291 }
292
293
294 static nxt_bool_t
nxt_sendbuf_copy(nxt_buf_mem_t * bm,nxt_buf_t * b,size_t * copied)295 nxt_sendbuf_copy(nxt_buf_mem_t *bm, nxt_buf_t *b, size_t *copied)
296 {
297 size_t size, bsize;
298 nxt_bool_t flush;
299
300 flush = 0;
301
302 do {
303 nxt_prefetch(b->next);
304
305 if (nxt_buf_is_mem(b)) {
306 bsize = bm->end - bm->free;
307 size = b->mem.free - b->mem.pos;
308 size = nxt_min(size, bsize);
309
310 nxt_memcpy(bm->free, b->mem.pos, size);
311
312 *copied += size;
313 bm->free += size;
314
315 if (bm->free == bm->end) {
316 return 1;
317 }
318 }
319
320 flush |= nxt_buf_is_flush(b) || nxt_buf_is_last(b);
321
322 b = b->next;
323
324 } while (b != NULL);
325
326 return flush;
327 }
328
329
330 nxt_buf_t *
nxt_sendbuf_update(nxt_buf_t * b,size_t sent)331 nxt_sendbuf_update(nxt_buf_t *b, size_t sent)
332 {
333 size_t size;
334
335 while (b != NULL) {
336
337 nxt_prefetch(b->next);
338
339 if (!nxt_buf_is_sync(b)) {
340
341 size = nxt_buf_used_size(b);
342
343 if (size != 0) {
344
345 if (sent == 0) {
346 break;
347 }
348
349 if (sent < size) {
350
351 if (nxt_buf_is_mem(b)) {
352 b->mem.pos += sent;
353 }
354
355 if (nxt_buf_is_file(b)) {
356 b->file_pos += sent;
357 }
358
359 break;
360 }
361
362 /* b->mem.free is NULL in file-only buffer. */
363 b->mem.pos = b->mem.free;
364
365 if (nxt_buf_is_file(b)) {
366 b->file_pos = b->file_end;
367 }
368
369 sent -= size;
370 }
371 }
372
373 b = b->next;
374 }
375
376 return b;
377 }
378
379
380 nxt_buf_t *
nxt_sendbuf_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b)381 nxt_sendbuf_completion(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
382 {
383 while (b != NULL) {
384
385 if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
386 break;
387 }
388
389 b = nxt_sendbuf_coalesce_completion(task, wq, b);
390 }
391
392 return b;
393 }
394
395
396 void
nxt_sendbuf_drain(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * b)397 nxt_sendbuf_drain(nxt_task_t *task, nxt_work_queue_t *wq, nxt_buf_t *b)
398 {
399 while (b != NULL) {
400 b = nxt_sendbuf_coalesce_completion(task, wq, b);
401 }
402 }
403
404
405 static nxt_buf_t *
nxt_sendbuf_coalesce_completion(nxt_task_t * task,nxt_work_queue_t * wq,nxt_buf_t * start)406 nxt_sendbuf_coalesce_completion(nxt_task_t *task, nxt_work_queue_t *wq,
407 nxt_buf_t *start)
408 {
409 nxt_buf_t *b, *next, **last, *rest, **last_rest;
410 nxt_work_handler_t handler;
411
412 rest = NULL;
413 last_rest = &rest;
414 last = &start->next;
415 b = start;
416 handler = b->completion_handler;
417
418 for ( ;; ) {
419 next = b->next;
420 if (next == NULL) {
421 break;
422 }
423
424 b->next = NULL;
425 b = next;
426
427 if (!nxt_buf_is_sync(b) && nxt_buf_used_size(b) != 0) {
428 *last_rest = b;
429 break;
430 }
431
432 if (handler == b->completion_handler) {
433 *last = b;
434 last = &b->next;
435
436 } else {
437 *last_rest = b;
438 last_rest = &b->next;
439 }
440 }
441
442 nxt_work_queue_add(wq, handler, task, start, start->parent);
443
444 return rest;
445 }
446