1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 typedef struct {
11 nxt_job_t job;
12 nxt_buf_t *out;
13 size_t sent;
14 size_t limit;
15 nxt_work_handler_t ready_handler;
16 } nxt_job_sendfile_t;
17
18
19 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
20 void *data);
21 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
22 void *data);
23 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
24 void *data);
25 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
26 nxt_conn_t *c, nxt_buf_t *b);
27
28
29 void
nxt_event_conn_job_sendfile(nxt_task_t * task,nxt_conn_t * c)30 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
31 {
32 nxt_fd_event_disable(task->thread->engine, &c->socket);
33
34 /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
35 nxt_event_conn_job_sendfile_start(task, c, NULL);
36 }
37
38
39 static void
nxt_event_conn_job_sendfile_start(nxt_task_t * task,void * obj,void * data)40 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
41 {
42 nxt_conn_t *c;
43 nxt_iobuf_t b;
44 nxt_job_sendfile_t *jbs;
45 nxt_sendbuf_coalesce_t sb;
46
47 c = obj;
48
49 nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
50
51 jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
52
53 if (nxt_slow_path(jbs == NULL)) {
54 c->write_state->error_handler(task, c, NULL);
55 return;
56 }
57
58 c->socket.write_handler = nxt_event_conn_job_sendfile_start;
59 c->socket.error_handler = c->write_state->error_handler;
60
61 jbs->job.data = c;
62 nxt_job_set_name(&jbs->job, "job sendfile");
63
64 jbs->limit = nxt_event_conn_write_limit(c);
65
66 if (jbs->limit != 0) {
67
68 sb.buf = c->write;
69 sb.iobuf = &b;
70 sb.nmax = 1;
71 sb.sync = 0;
72 sb.size = 0;
73 sb.limit = jbs->limit;
74
75 if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
76
77 jbs->job.thread_pool = c->u.thread_pool;
78 jbs->job.log = c->socket.log;
79 jbs->out = c->write;
80 c->write = NULL;
81 jbs->ready_handler = nxt_event_conn_job_sendfile_return;
82
83 c->block_read = 1;
84 c->block_write = 1;
85
86 nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
87 return;
88 }
89 }
90
91 nxt_event_conn_job_sendfile_return(task, jbs, c);
92 }
93
94
95 static void
nxt_event_conn_job_sendfile_handler(nxt_task_t * task,void * obj,void * data)96 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
97 {
98 ssize_t ret;
99 nxt_buf_t *b;
100 nxt_bool_t first;
101 nxt_conn_t *c;
102 nxt_job_sendfile_t *jbs;
103
104 jbs = obj;
105 c = data;
106
107 nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
108
109 first = c->socket.write_ready;
110 b = jbs->out;
111
112 do {
113 ret = c->io->old_sendbuf(c, b, jbs->limit);
114
115 if (ret == NXT_AGAIN) {
116 break;
117 }
118
119 if (nxt_slow_path(ret == NXT_ERROR)) {
120 goto done;
121 }
122
123 jbs->sent += ret;
124 jbs->limit -= ret;
125
126 b = nxt_sendbuf_update(b, ret);
127
128 if (b == NULL) {
129 goto done;
130 }
131
132 if (jbs->limit == 0) {
133
134 if (c->rate == NULL) {
135 jbs->limit = c->max_chunk;
136 goto fast;
137 }
138
139 goto done;
140 }
141
142 } while (c->socket.write_ready);
143
144 if (first && task->thread->thread_pool->work_queue.head != NULL) {
145 goto fast;
146 }
147
148 done:
149
150 nxt_job_return(task, &jbs->job, jbs->ready_handler);
151 return;
152
153 fast:
154
155 nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
156 jbs->job.task, jbs, c);
157
158 nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
159 }
160
161
162 static void
nxt_event_conn_job_sendfile_return(nxt_task_t * task,void * obj,void * data)163 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
164 {
165 size_t sent;
166 nxt_buf_t *b;
167 nxt_bool_t done;
168 nxt_conn_t *c;
169 nxt_job_sendfile_t *jbs;
170
171 jbs = obj;
172 c = data;
173
174 c->block_read = 0;
175 c->block_write = 0;
176
177 sent = jbs->sent;
178 c->sent += sent;
179
180 nxt_debug(task, "event conn sendfile sent:%z", sent);
181
182 b = jbs->out;
183
184 /* The job must be destroyed before connection error handler. */
185 nxt_job_destroy(task, jbs);
186
187 if (0 /* STUB: c->write_state->process_buffers */) {
188 b = nxt_event_conn_job_sendfile_completion(task, c, b);
189
190 done = (b == NULL);
191
192 /* Add data which might be added after sendfile job has started. */
193 nxt_buf_chain_add(&b, c->write);
194 c->write = b;
195
196 if (done) {
197 /* All data has been sent. */
198
199 if (b != NULL) {
200 /* But new data has been added. */
201 nxt_event_conn_job_sendfile_start(task, c, NULL);
202 }
203
204 return;
205 }
206 }
207
208 if (sent != 0 && c->write_state->timer_autoreset) {
209 nxt_timer_disable(task->thread->engine, &c->write_timer);
210 }
211
212 if (c->socket.error == 0
213 && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
214 {
215 nxt_conn_timer(task->thread->engine, c, c->write_state,
216 &c->write_timer);
217
218 nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
219 }
220
221 if (sent != 0) {
222 nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
223 task, c, c->socket.data);
224 /*
225 * Fall through if first operations were
226 * successful but the last one failed.
227 */
228 }
229
230 if (nxt_slow_path(c->socket.error != 0)) {
231 nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
232 task, c, c->socket.data);
233 }
234 }
235
236
237 static nxt_buf_t *
nxt_event_conn_job_sendfile_completion(nxt_task_t * task,nxt_conn_t * c,nxt_buf_t * b)238 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
239 nxt_buf_t *b)
240 {
241 while (b != NULL) {
242
243 nxt_prefetch(b->next);
244
245 if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
246 break;
247
248 } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
249 break;
250 }
251
252 nxt_work_queue_add(c->write_work_queue,
253 b->completion_handler, task, b, b->parent);
254
255 b = b->next;
256 }
257
258 return b;
259 }
260