1
2 /*
3 * Copyright (C) Igor Sysoev
4 * Copyright (C) NGINX, Inc.
5 */
6
7 #include <nxt_main.h>
8
9
10 static void nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data);
11 static void nxt_job_thread_return_handler(nxt_task_t *task, void *obj,
12 void *data);
13
14
15 void *
nxt_job_create(nxt_mp_t * mp,size_t size)16 nxt_job_create(nxt_mp_t *mp, size_t size)
17 {
18 size_t cache_size;
19 nxt_job_t *job;
20
21 if (mp == NULL) {
22 mp = nxt_mp_create(1024, 128, 256, 32);
23 if (nxt_slow_path(mp == NULL)) {
24 return NULL;
25 }
26
27 job = nxt_mp_zget(mp, size);
28 cache_size = 0;
29
30 } else {
31 job = nxt_mp_zalloc(mp, size);
32 cache_size = size;
33 }
34
35 if (nxt_fast_path(job != NULL)) {
36 job->cache_size = (uint16_t) cache_size;
37 job->mem_pool = mp;
38 nxt_job_set_name(job, "job");
39 }
40
41 /* Allow safe nxt_queue_remove() in nxt_job_destroy(). */
42 nxt_queue_self(&job->link);
43
44 return job;
45 }
46
47
48 void
nxt_job_init(nxt_job_t * job,size_t size)49 nxt_job_init(nxt_job_t *job, size_t size)
50 {
51 nxt_memzero(job, size);
52
53 nxt_job_set_name(job, "job");
54
55 nxt_queue_self(&job->link);
56 }
57
58
59 void
nxt_job_destroy(nxt_task_t * task,void * data)60 nxt_job_destroy(nxt_task_t *task, void *data)
61 {
62 nxt_job_t *job;
63
64 job = data;
65
66 nxt_queue_remove(&job->link);
67
68 if (job->cache_size == 0) {
69
70 if (job->mem_pool != NULL) {
71 nxt_mp_destroy(job->mem_pool);
72 }
73
74 } else {
75 nxt_mp_free(job->mem_pool, job);
76 }
77 }
78
79
80 #if 0
81
82 nxt_int_t
83 nxt_job_cleanup_add(nxt_mp_t *mp, nxt_job_t *job)
84 {
85 nxt_mem_pool_cleanup_t *mpcl;
86
87 mpcl = nxt_mem_pool_cleanup(mp, 0);
88
89 if (nxt_fast_path(mpcl != NULL)) {
90 mpcl->handler = nxt_job_destroy;
91 mpcl->data = job;
92 return NXT_OK;
93 }
94
95 return NXT_ERROR;
96 }
97
98 #endif
99
100
101 /*
102 * The (void *) casts in nxt_thread_pool_post() and nxt_event_engine_post()
103 * calls and to the "nxt_work_handler_t" are required by Sun C.
104 */
105
106 void
nxt_job_start(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)107 nxt_job_start(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
108 {
109 nxt_debug(task, "%s start", job->name);
110
111 if (job->thread_pool != NULL) {
112 nxt_int_t ret;
113
114 job->engine = task->thread->engine;
115
116 nxt_work_set(&job->work, nxt_job_thread_trampoline,
117 job->task, job, (void *) handler);
118
119 ret = nxt_thread_pool_post(job->thread_pool, &job->work);
120
121 if (ret == NXT_OK) {
122 return;
123 }
124
125 handler = job->abort_handler;
126 }
127
128 handler(job->task, job, job->data);
129 }
130
131
132 /* A trampoline function is called by a thread pool thread. */
133
134 static void
nxt_job_thread_trampoline(nxt_task_t * task,void * obj,void * data)135 nxt_job_thread_trampoline(nxt_task_t *task, void *obj, void *data)
136 {
137 nxt_job_t *job;
138 nxt_work_handler_t handler;
139
140 job = obj;
141 handler = (nxt_work_handler_t) data;
142
143 nxt_debug(task, "%s thread", job->name);
144
145 if (nxt_slow_path(job->cancel)) {
146 nxt_job_return(task, job, job->abort_handler);
147
148 } else {
149 handler(job->task, job, job->data);
150 }
151 }
152
153
154 void
nxt_job_return(nxt_task_t * task,nxt_job_t * job,nxt_work_handler_t handler)155 nxt_job_return(nxt_task_t *task, nxt_job_t *job, nxt_work_handler_t handler)
156 {
157 nxt_debug(task, "%s return", job->name);
158
159 if (job->engine != NULL) {
160 /* A return function is called in thread pool thread context. */
161
162 nxt_work_set(&job->work, nxt_job_thread_return_handler,
163 job->task, job, (void *) handler);
164
165 nxt_event_engine_post(job->engine, &job->work);
166
167 return;
168 }
169
170 if (nxt_slow_path(job->cancel)) {
171 nxt_debug(task, "%s cancellation", job->name);
172 handler = job->abort_handler;
173 }
174
175 nxt_work_queue_add(&task->thread->engine->fast_work_queue,
176 handler, job->task, job, job->data);
177 }
178
179
180 static void
nxt_job_thread_return_handler(nxt_task_t * task,void * obj,void * data)181 nxt_job_thread_return_handler(nxt_task_t *task, void *obj, void *data)
182 {
183 nxt_job_t *job;
184 nxt_work_handler_t handler;
185
186 job = obj;
187 handler = (nxt_work_handler_t) data;
188
189 job->task->thread = task->thread;
190
191 if (nxt_slow_path(job->cancel)) {
192 nxt_debug(task, "%s cancellation", job->name);
193 handler = job->abort_handler;
194 }
195
196 handler(job->task, job, job->data);
197 }
198