1 /* Background I/O service for Redis.
2  *
3  * This file implements operations that we need to perform in the background.
4  * Currently there is only a single operation, that is a background close(2)
5  * system call. This is needed as when the process is the last owner of a
6  * reference to a file closing it means unlinking it, and the deletion of the
7  * file is slow, blocking the server.
8  *
9  * In the future we'll either continue implementing new things we need or
10  * we'll switch to libeio. However there are probably long term uses for this
11  * file as we may want to put here Redis specific background tasks (for instance
12  * it is not impossible that we'll need a non blocking FLUSHDB/FLUSHALL
13  * implementation).
14  *
15  * DESIGN
16  * ------
17  *
18  * The design is trivial, we have a structure representing a job to perform
19  * and a different thread and job queue for every job type.
20  * Every thread waits for new jobs in its queue, and process every job
21  * sequentially.
22  *
23  * Jobs of the same type are guaranteed to be processed from the least
24  * recently inserted to the most recently inserted (older jobs processed
25  * first).
26  *
27  * Currently there is no way for the creator of the job to be notified about
28  * the completion of the operation, this will only be added when/if needed.
29  *
30  * ----------------------------------------------------------------------------
31  *
32  * Copyright (c) 2009-2012, Salvatore Sanfilippo <antirez at gmail dot com>
33  * All rights reserved.
34  *
35  * Redistribution and use in source and binary forms, with or without
36  * modification, are permitted provided that the following conditions are met:
37  *
38  *   * Redistributions of source code must retain the above copyright notice,
39  *     this list of conditions and the following disclaimer.
40  *   * Redistributions in binary form must reproduce the above copyright
41  *     notice, this list of conditions and the following disclaimer in the
42  *     documentation and/or other materials provided with the distribution.
43  *   * Neither the name of Redis nor the names of its contributors may be used
44  *     to endorse or promote products derived from this software without
45  *     specific prior written permission.
46  *
47  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
48  * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
49  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
50  * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
51  * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
52  * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
53  * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
54  * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
55  * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
56  * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
57  * POSSIBILITY OF SUCH DAMAGE.
58  */
59 
60 
61 #include "server.h"
62 #include "bio.h"
63 
64 static pthread_t bio_threads[BIO_NUM_OPS];
65 static pthread_mutex_t bio_mutex[BIO_NUM_OPS];
66 static pthread_cond_t bio_newjob_cond[BIO_NUM_OPS];
67 static pthread_cond_t bio_step_cond[BIO_NUM_OPS];
68 static list *bio_jobs[BIO_NUM_OPS];
69 /* The following array is used to hold the number of pending jobs for every
70  * OP type. This allows us to export the bioPendingJobsOfType() API that is
71  * useful when the main thread wants to perform some operation that may involve
72  * objects shared with the background thread. The main thread will just wait
73  * that there are no longer jobs of this type to be executed before performing
74  * the sensible operation. This data is also useful for reporting. */
75 static unsigned long long bio_pending[BIO_NUM_OPS];
76 
77 /* This structure represents a background Job. It is only used locally to this
78  * file as the API does not expose the internals at all. */
79 struct bio_job {
80     /* Job specific arguments.*/
81     int fd; /* Fd for file based background jobs */
82     lazy_free_fn *free_fn; /* Function that will free the provided arguments */
83     void *free_args[]; /* List of arguments to be passed to the free function */
84 };
85 
86 void *bioProcessBackgroundJobs(void *arg);
87 
88 /* Make sure we have enough stack to perform all the things we do in the
89  * main thread. */
90 #define REDIS_THREAD_STACK_SIZE (1024*1024*4)
91 
92 /* Initialize the background system, spawning the thread. */
bioInit(void)93 void bioInit(void) {
94     pthread_attr_t attr;
95     pthread_t thread;
96     size_t stacksize;
97     int j;
98 
99     /* Initialization of state vars and objects */
100     for (j = 0; j < BIO_NUM_OPS; j++) {
101         pthread_mutex_init(&bio_mutex[j],NULL);
102         pthread_cond_init(&bio_newjob_cond[j],NULL);
103         pthread_cond_init(&bio_step_cond[j],NULL);
104         bio_jobs[j] = listCreate();
105         bio_pending[j] = 0;
106     }
107 
108     /* Set the stack size as by default it may be small in some system */
109     pthread_attr_init(&attr);
110     pthread_attr_getstacksize(&attr,&stacksize);
111     if (!stacksize) stacksize = 1; /* The world is full of Solaris Fixes */
112     while (stacksize < REDIS_THREAD_STACK_SIZE) stacksize *= 2;
113     pthread_attr_setstacksize(&attr, stacksize);
114 
115     /* Ready to spawn our threads. We use the single argument the thread
116      * function accepts in order to pass the job ID the thread is
117      * responsible of. */
118     for (j = 0; j < BIO_NUM_OPS; j++) {
119         void *arg = (void*)(unsigned long) j;
120         if (pthread_create(&thread,&attr,bioProcessBackgroundJobs,arg) != 0) {
121             serverLog(LL_WARNING,"Fatal: Can't initialize Background Jobs.");
122             exit(1);
123         }
124         bio_threads[j] = thread;
125     }
126 }
127 
bioSubmitJob(int type,struct bio_job * job)128 void bioSubmitJob(int type, struct bio_job *job) {
129     pthread_mutex_lock(&bio_mutex[type]);
130     listAddNodeTail(bio_jobs[type],job);
131     bio_pending[type]++;
132     pthread_cond_signal(&bio_newjob_cond[type]);
133     pthread_mutex_unlock(&bio_mutex[type]);
134 }
135 
bioCreateLazyFreeJob(lazy_free_fn free_fn,int arg_count,...)136 void bioCreateLazyFreeJob(lazy_free_fn free_fn, int arg_count, ...) {
137     va_list valist;
138     /* Allocate memory for the job structure and all required
139      * arguments */
140     struct bio_job *job = zmalloc(sizeof(*job) + sizeof(void *) * (arg_count));
141     job->free_fn = free_fn;
142 
143     va_start(valist, arg_count);
144     for (int i = 0; i < arg_count; i++) {
145         job->free_args[i] = va_arg(valist, void *);
146     }
147     va_end(valist);
148     bioSubmitJob(BIO_LAZY_FREE, job);
149 }
150 
bioCreateCloseJob(int fd)151 void bioCreateCloseJob(int fd) {
152     struct bio_job *job = zmalloc(sizeof(*job));
153     job->fd = fd;
154 
155     bioSubmitJob(BIO_CLOSE_FILE, job);
156 }
157 
bioCreateFsyncJob(int fd)158 void bioCreateFsyncJob(int fd) {
159     struct bio_job *job = zmalloc(sizeof(*job));
160     job->fd = fd;
161 
162     bioSubmitJob(BIO_AOF_FSYNC, job);
163 }
164 
bioProcessBackgroundJobs(void * arg)165 void *bioProcessBackgroundJobs(void *arg) {
166     struct bio_job *job;
167     unsigned long type = (unsigned long) arg;
168     sigset_t sigset;
169 
170     /* Check that the type is within the right interval. */
171     if (type >= BIO_NUM_OPS) {
172         serverLog(LL_WARNING,
173             "Warning: bio thread started with wrong type %lu",type);
174         return NULL;
175     }
176 
177     switch (type) {
178     case BIO_CLOSE_FILE:
179         redis_set_thread_title("bio_close_file");
180         break;
181     case BIO_AOF_FSYNC:
182         redis_set_thread_title("bio_aof_fsync");
183         break;
184     case BIO_LAZY_FREE:
185         redis_set_thread_title("bio_lazy_free");
186         break;
187     }
188 
189     redisSetCpuAffinity(server.bio_cpulist);
190 
191     makeThreadKillable();
192 
193     pthread_mutex_lock(&bio_mutex[type]);
194     /* Block SIGALRM so we are sure that only the main thread will
195      * receive the watchdog signal. */
196     sigemptyset(&sigset);
197     sigaddset(&sigset, SIGALRM);
198     if (pthread_sigmask(SIG_BLOCK, &sigset, NULL))
199         serverLog(LL_WARNING,
200             "Warning: can't mask SIGALRM in bio.c thread: %s", strerror(errno));
201 
202     while(1) {
203         listNode *ln;
204 
205         /* The loop always starts with the lock hold. */
206         if (listLength(bio_jobs[type]) == 0) {
207             pthread_cond_wait(&bio_newjob_cond[type],&bio_mutex[type]);
208             continue;
209         }
210         /* Pop the job from the queue. */
211         ln = listFirst(bio_jobs[type]);
212         job = ln->value;
213         /* It is now possible to unlock the background system as we know have
214          * a stand alone job structure to process.*/
215         pthread_mutex_unlock(&bio_mutex[type]);
216 
217         /* Process the job accordingly to its type. */
218         if (type == BIO_CLOSE_FILE) {
219             close(job->fd);
220         } else if (type == BIO_AOF_FSYNC) {
221             /* The fd may be closed by main thread and reused for another
222              * socket, pipe, or file. We just ignore these errno because
223              * aof fsync did not really fail. */
224             if (redis_fsync(job->fd) == -1 &&
225                 errno != EBADF && errno != EINVAL)
226             {
227                 int last_status;
228                 atomicGet(server.aof_bio_fsync_status,last_status);
229                 atomicSet(server.aof_bio_fsync_status,C_ERR);
230                 atomicSet(server.aof_bio_fsync_errno,errno);
231                 if (last_status == C_OK) {
232                     serverLog(LL_WARNING,
233                         "Fail to fsync the AOF file: %s",strerror(errno));
234                 }
235             } else {
236                 atomicSet(server.aof_bio_fsync_status,C_OK);
237             }
238         } else if (type == BIO_LAZY_FREE) {
239             job->free_fn(job->free_args);
240         } else {
241             serverPanic("Wrong job type in bioProcessBackgroundJobs().");
242         }
243         zfree(job);
244 
245         /* Lock again before reiterating the loop, if there are no longer
246          * jobs to process we'll block again in pthread_cond_wait(). */
247         pthread_mutex_lock(&bio_mutex[type]);
248         listDelNode(bio_jobs[type],ln);
249         bio_pending[type]--;
250 
251         /* Unblock threads blocked on bioWaitStepOfType() if any. */
252         pthread_cond_broadcast(&bio_step_cond[type]);
253     }
254 }
255 
256 /* Return the number of pending jobs of the specified type. */
bioPendingJobsOfType(int type)257 unsigned long long bioPendingJobsOfType(int type) {
258     unsigned long long val;
259     pthread_mutex_lock(&bio_mutex[type]);
260     val = bio_pending[type];
261     pthread_mutex_unlock(&bio_mutex[type]);
262     return val;
263 }
264 
265 /* If there are pending jobs for the specified type, the function blocks
266  * and waits that the next job was processed. Otherwise the function
267  * does not block and returns ASAP.
268  *
269  * The function returns the number of jobs still to process of the
270  * requested type.
271  *
272  * This function is useful when from another thread, we want to wait
273  * a bio.c thread to do more work in a blocking way.
274  */
bioWaitStepOfType(int type)275 unsigned long long bioWaitStepOfType(int type) {
276     unsigned long long val;
277     pthread_mutex_lock(&bio_mutex[type]);
278     val = bio_pending[type];
279     if (val != 0) {
280         pthread_cond_wait(&bio_step_cond[type],&bio_mutex[type]);
281         val = bio_pending[type];
282     }
283     pthread_mutex_unlock(&bio_mutex[type]);
284     return val;
285 }
286 
287 /* Kill the running bio threads in an unclean way. This function should be
288  * used only when it's critical to stop the threads for some reason.
289  * Currently Redis does this only on crash (for instance on SIGSEGV) in order
290  * to perform a fast memory check without other threads messing with memory. */
bioKillThreads(void)291 void bioKillThreads(void) {
292     int err, j;
293 
294     for (j = 0; j < BIO_NUM_OPS; j++) {
295         if (bio_threads[j] == pthread_self()) continue;
296         if (bio_threads[j] && pthread_cancel(bio_threads[j]) == 0) {
297             if ((err = pthread_join(bio_threads[j],NULL)) != 0) {
298                 serverLog(LL_WARNING,
299                     "Bio thread for job type #%d can not be joined: %s",
300                         j, strerror(err));
301             } else {
302                 serverLog(LL_WARNING,
303                     "Bio thread for job type #%d terminated",j);
304             }
305         }
306     }
307 }
308