1 
2 /*
3  * Copyright (C) Igor Sysoev
4  * Copyright (C) NGINX, Inc.
5  */
6 
7 #include <nxt_main.h>
8 
9 
10 typedef struct {
11     nxt_job_t           job;
12     nxt_buf_t           *out;
13     size_t              sent;
14     size_t              limit;
15     nxt_work_handler_t  ready_handler;
16 } nxt_job_sendfile_t;
17 
18 
19 static void nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj,
20     void *data);
21 static void nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj,
22     void *data);
23 static void nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj,
24     void *data);
25 static nxt_buf_t *nxt_event_conn_job_sendfile_completion(nxt_task_t *task,
26     nxt_conn_t *c, nxt_buf_t *b);
27 
28 
29 void
nxt_event_conn_job_sendfile(nxt_task_t * task,nxt_conn_t * c)30 nxt_event_conn_job_sendfile(nxt_task_t *task, nxt_conn_t *c)
31 {
32     nxt_fd_event_disable(task->thread->engine, &c->socket);
33 
34     /* A work item data is not used in nxt_event_conn_job_sendfile_start(). */
35     nxt_event_conn_job_sendfile_start(task, c, NULL);
36 }
37 
38 
39 static void
nxt_event_conn_job_sendfile_start(nxt_task_t * task,void * obj,void * data)40 nxt_event_conn_job_sendfile_start(nxt_task_t *task, void *obj, void *data)
41 {
42     nxt_conn_t              *c;
43     nxt_iobuf_t             b;
44     nxt_job_sendfile_t      *jbs;
45     nxt_sendbuf_coalesce_t  sb;
46 
47     c = obj;
48 
49     nxt_debug(task, "event conn sendfile fd:%d", c->socket.fd);
50 
51     jbs = nxt_job_create(c->mem_pool, sizeof(nxt_job_sendfile_t));
52 
53     if (nxt_slow_path(jbs == NULL)) {
54         c->write_state->error_handler(task, c, NULL);
55         return;
56     }
57 
58     c->socket.write_handler = nxt_event_conn_job_sendfile_start;
59     c->socket.error_handler = c->write_state->error_handler;
60 
61     jbs->job.data = c;
62     nxt_job_set_name(&jbs->job, "job sendfile");
63 
64     jbs->limit = nxt_event_conn_write_limit(c);
65 
66     if (jbs->limit != 0) {
67 
68         sb.buf = c->write;
69         sb.iobuf = &b;
70         sb.nmax = 1;
71         sb.sync = 0;
72         sb.size = 0;
73         sb.limit = jbs->limit;
74 
75         if (nxt_sendbuf_mem_coalesce(c->socket.task, &sb) != 0 || !sb.sync) {
76 
77             jbs->job.thread_pool = c->u.thread_pool;
78             jbs->job.log = c->socket.log;
79             jbs->out = c->write;
80             c->write = NULL;
81             jbs->ready_handler = nxt_event_conn_job_sendfile_return;
82 
83             c->block_read = 1;
84             c->block_write = 1;
85 
86             nxt_job_start(task, &jbs->job, nxt_event_conn_job_sendfile_handler);
87             return;
88         }
89     }
90 
91     nxt_event_conn_job_sendfile_return(task, jbs, c);
92 }
93 
94 
95 static void
nxt_event_conn_job_sendfile_handler(nxt_task_t * task,void * obj,void * data)96 nxt_event_conn_job_sendfile_handler(nxt_task_t *task, void *obj, void *data)
97 {
98     ssize_t             ret;
99     nxt_buf_t           *b;
100     nxt_bool_t          first;
101     nxt_conn_t          *c;
102     nxt_job_sendfile_t  *jbs;
103 
104     jbs = obj;
105     c = data;
106 
107     nxt_debug(task, "event conn job sendfile fd:%d", c->socket.fd);
108 
109     first = c->socket.write_ready;
110     b = jbs->out;
111 
112     do {
113         ret = c->io->old_sendbuf(c, b, jbs->limit);
114 
115         if (ret == NXT_AGAIN) {
116             break;
117         }
118 
119         if (nxt_slow_path(ret == NXT_ERROR)) {
120             goto done;
121         }
122 
123         jbs->sent += ret;
124         jbs->limit -= ret;
125 
126         b = nxt_sendbuf_update(b, ret);
127 
128         if (b == NULL) {
129             goto done;
130         }
131 
132         if (jbs->limit == 0) {
133 
134             if (c->rate == NULL) {
135                 jbs->limit = c->max_chunk;
136                 goto fast;
137             }
138 
139             goto done;
140         }
141 
142     } while (c->socket.write_ready);
143 
144     if (first && task->thread->thread_pool->work_queue.head != NULL) {
145         goto fast;
146     }
147 
148 done:
149 
150     nxt_job_return(task, &jbs->job, jbs->ready_handler);
151     return;
152 
153 fast:
154 
155     nxt_work_set(&jbs->job.work, nxt_event_conn_job_sendfile_handler,
156                  jbs->job.task, jbs, c);
157 
158     nxt_thread_pool_post(task->thread->thread_pool, &jbs->job.work);
159 }
160 
161 
162 static void
nxt_event_conn_job_sendfile_return(nxt_task_t * task,void * obj,void * data)163 nxt_event_conn_job_sendfile_return(nxt_task_t *task, void *obj, void *data)
164 {
165     size_t              sent;
166     nxt_buf_t           *b;
167     nxt_bool_t          done;
168     nxt_conn_t          *c;
169     nxt_job_sendfile_t  *jbs;
170 
171     jbs = obj;
172     c = data;
173 
174     c->block_read = 0;
175     c->block_write = 0;
176 
177     sent = jbs->sent;
178     c->sent += sent;
179 
180     nxt_debug(task, "event conn sendfile sent:%z", sent);
181 
182     b = jbs->out;
183 
184     /* The job must be destroyed before connection error handler. */
185     nxt_job_destroy(task, jbs);
186 
187     if (0 /* STUB: c->write_state->process_buffers */) {
188         b = nxt_event_conn_job_sendfile_completion(task, c, b);
189 
190         done = (b == NULL);
191 
192         /* Add data which might be added after sendfile job has started. */
193         nxt_buf_chain_add(&b, c->write);
194         c->write = b;
195 
196         if (done) {
197             /* All data has been sent. */
198 
199             if (b != NULL) {
200                 /* But new data has been added. */
201                 nxt_event_conn_job_sendfile_start(task, c, NULL);
202             }
203 
204             return;
205         }
206     }
207 
208     if (sent != 0 && c->write_state->timer_autoreset) {
209         nxt_timer_disable(task->thread->engine, &c->write_timer);
210     }
211 
212     if (c->socket.error == 0
213         && !nxt_event_conn_write_delayed(task->thread->engine, c, sent))
214     {
215         nxt_conn_timer(task->thread->engine, c, c->write_state,
216                        &c->write_timer);
217 
218         nxt_fd_event_oneshot_write(task->thread->engine, &c->socket);
219     }
220 
221     if (sent != 0) {
222         nxt_work_queue_add(c->write_work_queue, c->write_state->ready_handler,
223                            task, c, c->socket.data);
224         /*
225          * Fall through if first operations were
226          * successful but the last one failed.
227          */
228     }
229 
230     if (nxt_slow_path(c->socket.error != 0)) {
231         nxt_work_queue_add(c->write_work_queue, c->write_state->error_handler,
232                            task, c, c->socket.data);
233     }
234 }
235 
236 
237 static nxt_buf_t *
nxt_event_conn_job_sendfile_completion(nxt_task_t * task,nxt_conn_t * c,nxt_buf_t * b)238 nxt_event_conn_job_sendfile_completion(nxt_task_t *task, nxt_conn_t *c,
239     nxt_buf_t *b)
240 {
241     while (b != NULL) {
242 
243         nxt_prefetch(b->next);
244 
245         if (nxt_buf_is_mem(b) && b->mem.pos != b->mem.free) {
246             break;
247 
248         } else if (nxt_buf_is_file(b) && b->file_pos != b->file_end) {
249             break;
250         }
251 
252         nxt_work_queue_add(c->write_work_queue,
253                            b->completion_handler, task, b, b->parent);
254 
255         b = b->next;
256     }
257 
258     return b;
259 }
260