1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 */
18 #ifndef THREADED
19 #define THREADED
20 #endif
21
22 #if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
23 # define USE_STATIC_LIB
24 #endif
25
26 #ifndef _GNU_SOURCE
27 #define _GNU_SOURCE
28 #endif
29
30 #include "zk_adaptor.h"
31 #include "zookeeper_log.h"
32
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <assert.h>
38 #include <errno.h>
39
40 #ifndef WIN32
41 #include <signal.h>
42 #include <poll.h>
43 #include <unistd.h>
44 #include <sys/time.h>
45 #endif
46
zoo_lock_auth(zhandle_t * zh)47 int zoo_lock_auth(zhandle_t *zh)
48 {
49 return pthread_mutex_lock(&zh->auth_h.lock);
50 }
zoo_unlock_auth(zhandle_t * zh)51 int zoo_unlock_auth(zhandle_t *zh)
52 {
53 return pthread_mutex_unlock(&zh->auth_h.lock);
54 }
lock_buffer_list(buffer_head_t * l)55 int lock_buffer_list(buffer_head_t *l)
56 {
57 return pthread_mutex_lock(&l->lock);
58 }
unlock_buffer_list(buffer_head_t * l)59 int unlock_buffer_list(buffer_head_t *l)
60 {
61 return pthread_mutex_unlock(&l->lock);
62 }
lock_completion_list(completion_head_t * l)63 int lock_completion_list(completion_head_t *l)
64 {
65 return pthread_mutex_lock(&l->lock);
66 }
unlock_completion_list(completion_head_t * l)67 int unlock_completion_list(completion_head_t *l)
68 {
69 pthread_cond_broadcast(&l->cond);
70 return pthread_mutex_unlock(&l->lock);
71 }
alloc_sync_completion(void)72 struct sync_completion *alloc_sync_completion(void)
73 {
74 struct sync_completion *sc = (struct sync_completion*)calloc(1, sizeof(struct sync_completion));
75 if (sc) {
76 pthread_cond_init(&sc->cond, 0);
77 pthread_mutex_init(&sc->lock, 0);
78 }
79 return sc;
80 }
wait_sync_completion(struct sync_completion * sc)81 int wait_sync_completion(struct sync_completion *sc)
82 {
83 pthread_mutex_lock(&sc->lock);
84 while (!sc->complete) {
85 pthread_cond_wait(&sc->cond, &sc->lock);
86 }
87 pthread_mutex_unlock(&sc->lock);
88 return 0;
89 }
90
free_sync_completion(struct sync_completion * sc)91 void free_sync_completion(struct sync_completion *sc)
92 {
93 if (sc) {
94 pthread_mutex_destroy(&sc->lock);
95 pthread_cond_destroy(&sc->cond);
96 free(sc);
97 }
98 }
99
notify_sync_completion(struct sync_completion * sc)100 void notify_sync_completion(struct sync_completion *sc)
101 {
102 pthread_mutex_lock(&sc->lock);
103 sc->complete = 1;
104 pthread_cond_broadcast(&sc->cond);
105 pthread_mutex_unlock(&sc->lock);
106 }
107
process_async(int outstanding_sync)108 int process_async(int outstanding_sync)
109 {
110 return 0;
111 }
112
113 #ifdef WIN32
114 unsigned __stdcall do_io( void * );
115 unsigned __stdcall do_completion( void * );
116
handle_error(zhandle_t * zh,SOCKET sock,char * message)117 int handle_error(zhandle_t* zh, SOCKET sock, char* message)
118 {
119 LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
120 closesocket (sock);
121 return -1;
122 }
123
124 //--create socket pair for interupting selects.
create_socket_pair(zhandle_t * zh,SOCKET fds[2])125 int create_socket_pair(zhandle_t* zh, SOCKET fds[2])
126 {
127 struct sockaddr_in inaddr;
128 struct sockaddr addr;
129 int yes=1;
130 int len=0;
131
132 SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP);
133 if (lst == INVALID_SOCKET ){
134 LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
135 return -1;
136 }
137 memset(&inaddr, 0, sizeof(inaddr));
138 memset(&addr, 0, sizeof(addr));
139 inaddr.sin_family = AF_INET;
140 inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
141 inaddr.sin_port = 0; //--system assigns the port
142
143 if ( setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)) == SOCKET_ERROR ) {
144 return handle_error(zh, lst,"Error trying to set socket option.");
145 }
146 if (bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)) == SOCKET_ERROR){
147 return handle_error(zh, lst,"Error trying to bind socket.");
148 }
149 if (listen(lst,1) == SOCKET_ERROR){
150 return handle_error(zh, lst,"Error trying to listen on socket.");
151 }
152 len=sizeof(inaddr);
153 getsockname(lst, &addr,&len);
154 fds[0]=socket(AF_INET, SOCK_STREAM,0);
155 if (connect(fds[0],&addr,len) == SOCKET_ERROR){
156 return handle_error(zh, lst, "Error while connecting to socket.");
157 }
158 if ((fds[1]=accept(lst,0,0)) == INVALID_SOCKET){
159 closesocket(fds[0]);
160 return handle_error(zh, lst, "Error while accepting socket connection.");
161 }
162 closesocket(lst);
163 return 0;
164 }
165 #else
166 void *do_io(void *);
167 void *do_completion(void *);
168 #endif
169
170
171 int wakeup_io_thread(zhandle_t *zh);
172
173 #ifdef WIN32
set_nonblock(SOCKET fd)174 static int set_nonblock(SOCKET fd){
175 ULONG nonblocking_flag = 1;
176 if (ioctlsocket(fd, FIONBIO, &nonblocking_flag) == 0)
177 return 1;
178 else
179 return -1;
180 }
181 #else
set_nonblock(int fd)182 static int set_nonblock(int fd){
183 long l = fcntl(fd, F_GETFL);
184 if(l & O_NONBLOCK) return 0;
185 return fcntl(fd, F_SETFL, l | O_NONBLOCK);
186 }
187 #endif
188
wait_for_others(zhandle_t * zh)189 void wait_for_others(zhandle_t* zh)
190 {
191 struct adaptor_threads* adaptor=zh->adaptor_priv;
192 pthread_mutex_lock(&adaptor->lock);
193 while(adaptor->threadsToWait>0)
194 pthread_cond_wait(&adaptor->cond,&adaptor->lock);
195 pthread_mutex_unlock(&adaptor->lock);
196 }
197
notify_thread_ready(zhandle_t * zh)198 void notify_thread_ready(zhandle_t* zh)
199 {
200 struct adaptor_threads* adaptor=zh->adaptor_priv;
201 pthread_mutex_lock(&adaptor->lock);
202 adaptor->threadsToWait--;
203 pthread_cond_broadcast(&adaptor->cond);
204 while(adaptor->threadsToWait>0)
205 pthread_cond_wait(&adaptor->cond,&adaptor->lock);
206 pthread_mutex_unlock(&adaptor->lock);
207 }
208
209
start_threads(zhandle_t * zh)210 void start_threads(zhandle_t* zh)
211 {
212 int rc = 0;
213 struct adaptor_threads* adaptor=zh->adaptor_priv;
214 pthread_cond_init(&adaptor->cond,0);
215 pthread_mutex_init(&adaptor->lock,0);
216 adaptor->threadsToWait=2; // wait for 2 threads before opening the barrier
217
218 // use api_prolog() to make sure zhandle doesn't get destroyed
219 // while initialization is in progress
220 api_prolog(zh);
221 LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
222 rc=pthread_create(&adaptor->io, 0, do_io, zh);
223 assert("pthread_create() failed for the IO thread"&&!rc);
224 rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
225 assert("pthread_create() failed for the completion thread"&&!rc);
226 wait_for_others(zh);
227 api_epilog(zh, 0);
228 }
229
adaptor_init(zhandle_t * zh)230 int adaptor_init(zhandle_t *zh)
231 {
232 pthread_mutexattr_t recursive_mx_attr;
233 struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
234 if (!adaptor_threads) {
235 LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
236 return -1;
237 }
238
239 /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
240 #ifdef WIN32
241 if (create_socket_pair(zh, adaptor_threads->self_pipe) == -1){
242 LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
243 #else
244 if(pipe(adaptor_threads->self_pipe)==-1) {
245 LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
246 #endif
247 free(adaptor_threads);
248 return -1;
249 }
250 set_nonblock(adaptor_threads->self_pipe[1]);
251 set_nonblock(adaptor_threads->self_pipe[0]);
252
253 pthread_mutex_init(&zh->auth_h.lock,0);
254
255 zh->adaptor_priv = adaptor_threads;
256 pthread_mutex_init(&zh->to_process.lock,0);
257 pthread_mutex_init(&adaptor_threads->zh_lock,0);
258 pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
259 pthread_mutex_init(&adaptor_threads->watchers_lock,0);
260 // to_send must be recursive mutex
261 pthread_mutexattr_init(&recursive_mx_attr);
262 pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
263 pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr);
264 pthread_mutexattr_destroy(&recursive_mx_attr);
265
266 pthread_mutex_init(&zh->sent_requests.lock,0);
267 pthread_cond_init(&zh->sent_requests.cond,0);
268 pthread_mutex_init(&zh->completions_to_process.lock,0);
269 pthread_cond_init(&zh->completions_to_process.cond,0);
270 start_threads(zh);
271 return 0;
272 }
273
274 void adaptor_finish(zhandle_t *zh)
275 {
276 struct adaptor_threads *adaptor_threads;
277 // make sure zh doesn't get destroyed until after we're done here
278 api_prolog(zh);
279 adaptor_threads = zh->adaptor_priv;
280 if(adaptor_threads==0) {
281 api_epilog(zh,0);
282 return;
283 }
284
285 if(!pthread_equal(adaptor_threads->io,pthread_self())){
286 wakeup_io_thread(zh);
287 pthread_join(adaptor_threads->io, 0);
288 }else
289 pthread_detach(adaptor_threads->io);
290
291 if(!pthread_equal(adaptor_threads->completion,pthread_self())){
292 pthread_mutex_lock(&zh->completions_to_process.lock);
293 pthread_cond_broadcast(&zh->completions_to_process.cond);
294 pthread_mutex_unlock(&zh->completions_to_process.lock);
295 pthread_join(adaptor_threads->completion, 0);
296 }else
297 pthread_detach(adaptor_threads->completion);
298
299 api_epilog(zh,0);
300 }
301
302 void adaptor_destroy(zhandle_t *zh)
303 {
304 struct adaptor_threads *adaptor = zh->adaptor_priv;
305 if(adaptor==0) return;
306
307 pthread_cond_destroy(&adaptor->cond);
308 pthread_mutex_destroy(&adaptor->lock);
309 pthread_mutex_destroy(&zh->to_process.lock);
310 pthread_mutex_destroy(&zh->to_send.lock);
311 pthread_mutex_destroy(&zh->sent_requests.lock);
312 pthread_cond_destroy(&zh->sent_requests.cond);
313 pthread_mutex_destroy(&zh->completions_to_process.lock);
314 pthread_cond_destroy(&zh->completions_to_process.cond);
315 pthread_mutex_destroy(&adaptor->zh_lock);
316
317 pthread_mutex_destroy(&zh->auth_h.lock);
318
319 close(adaptor->self_pipe[0]);
320 close(adaptor->self_pipe[1]);
321 free(adaptor);
322 zh->adaptor_priv=0;
323 }
324
325 int wakeup_io_thread(zhandle_t *zh)
326 {
327 struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
328 char c=0;
329 #ifndef WIN32
330 return write(adaptor_threads->self_pipe[1],&c,1)==1? ZOK: ZSYSTEMERROR;
331 #else
332 return send(adaptor_threads->self_pipe[1], &c, 1, 0)==1? ZOK: ZSYSTEMERROR;
333 #endif
334 }
335
336 int adaptor_send_queue(zhandle_t *zh, int timeout)
337 {
338 if(!zh->close_requested)
339 return wakeup_io_thread(zh);
340 // don't rely on the IO thread to send the messages if the app has
341 // requested to close
342 return flush_send_queue(zh, timeout);
343 }
344
345 /* These two are declared here because we will run the event loop
346 * and not the client */
347 #ifdef WIN32
348 int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
349 struct timeval *tv);
350 #else
351 int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
352 struct timeval *tv);
353 #endif
354 int zookeeper_process(zhandle_t *zh, int events);
355
356 #ifdef WIN32
357 unsigned __stdcall do_io( void * v)
358 #else
359 void *do_io(void *v)
360 #endif
361 {
362 zhandle_t *zh = (zhandle_t*)v;
363 #ifndef WIN32
364 struct pollfd fds[2];
365 struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
366
367 api_prolog(zh);
368 notify_thread_ready(zh);
369 LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
370 fds[0].fd=adaptor_threads->self_pipe[0];
371 fds[0].events=POLLIN;
372 while(!zh->close_requested) {
373 struct timeval tv;
374 int fd;
375 int interest;
376 int timeout;
377 int maxfd=1;
378
379 zh->io_count++;
380
381 zookeeper_interest(zh, &fd, &interest, &tv);
382 if (fd != -1) {
383 fds[1].fd=fd;
384 fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
385 fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
386 maxfd=2;
387 }
388 timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
389
390 poll(fds,maxfd,timeout);
391 if (fd != -1) {
392 interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
393 interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
394 }
395 if(fds[0].revents&POLLIN){
396 // flush the pipe
397 char b[128];
398 while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
399 }
400 #else
401 fd_set rfds, wfds;
402 struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
403 api_prolog(zh);
404 notify_thread_ready(zh);
405 LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
406
407 while(!zh->close_requested) {
408 struct timeval tv;
409 SOCKET fd;
410 int interest;
411 int rc;
412
413 zookeeper_interest(zh, &fd, &interest, &tv);
414
415 // FD_ZERO is cheap on Win32, it just sets count of elements to zero.
416 // It needs to be done to ensure no stale entries.
417 FD_ZERO(&rfds);
418 FD_ZERO(&wfds);
419
420 if (fd != -1) {
421 if (interest&ZOOKEEPER_READ) {
422 FD_SET(fd, &rfds);
423 }
424
425 if (interest&ZOOKEEPER_WRITE) {
426 FD_SET(fd, &wfds);
427 }
428 }
429
430 // Always interested in self_pipe.
431 FD_SET(adaptor_threads->self_pipe[0], &rfds);
432
433 rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
434 if (rc > 0) {
435 interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
436 interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;
437
438 if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
439 // flush the pipe/socket
440 char b[128];
441 while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
442 }
443 }
444 else if (rc < 0) {
445 LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));
446
447 // Clear interest events for zookeeper_process if select() fails.
448 interest = 0;
449 }
450
451 #endif
452 // dispatch zookeeper events
453 zookeeper_process(zh, interest);
454 // check the current state of the zhandle and terminate
455 // if it is_unrecoverable()
456 if(is_unrecoverable(zh))
457 break;
458 }
459 api_epilog(zh, 0);
460 LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
461 return 0;
462 }
463
464 #ifdef WIN32
465 unsigned __stdcall do_completion( void * v)
466 #else
467 void *do_completion(void *v)
468 #endif
469 {
470 zhandle_t *zh = v;
471 api_prolog(zh);
472 notify_thread_ready(zh);
473 LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
474 while(!zh->close_requested) {
475 pthread_mutex_lock(&zh->completions_to_process.lock);
476 while(!zh->completions_to_process.head && !zh->close_requested) {
477 pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock);
478 }
479 pthread_mutex_unlock(&zh->completions_to_process.lock);
480 process_completions(zh);
481 }
482 api_epilog(zh, 0);
483 LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
484 return 0;
485 }
486
487 int32_t inc_ref_counter(zhandle_t* zh,int i)
488 {
489 int incr=(i<0?-1:(i>0?1:0));
490 // fetch_and_add implements atomic post-increment
491 int v=fetch_and_add(&zh->ref_counter,incr);
492 // inc_ref_counter wants pre-increment
493 v+=incr; // simulate pre-increment
494 return v;
495 }
496
497 int32_t fetch_and_add(volatile int32_t* operand, int incr)
498 {
499 #ifndef WIN32
500 return __sync_fetch_and_add(operand, incr);
501 #else
502 return InterlockedExchangeAdd(operand, incr);
503 #endif
504 }
505
506 // make sure the static xid is initialized before any threads started
507 __attribute__((constructor)) int32_t get_xid()
508 {
509 static int32_t xid = -1;
510 if (xid == -1) {
511 xid = time(0);
512 }
513 return fetch_and_add(&xid,1);
514 }
515
516 int lock_reconfig(struct _zhandle *zh)
517 {
518 struct adaptor_threads *adaptor = zh->adaptor_priv;
519 if (adaptor) {
520 return pthread_mutex_lock(&adaptor->reconfig_lock);
521 } else {
522 return 0;
523 }
524 }
525 int unlock_reconfig(struct _zhandle *zh)
526 {
527 struct adaptor_threads *adaptor = zh->adaptor_priv;
528 if (adaptor) {
529 return pthread_mutex_unlock(&adaptor->reconfig_lock);
530 } else {
531 return 0;
532 }
533 }
534
535 int lock_watchers(struct _zhandle *zh)
536 {
537 struct adaptor_threads *adaptor = zh->adaptor_priv;
538 if (adaptor) {
539 return pthread_mutex_lock(&adaptor->watchers_lock);
540 } else {
541 return 0;
542 }
543 }
544 int unlock_watchers(struct _zhandle *zh)
545 {
546 struct adaptor_threads *adaptor = zh->adaptor_priv;
547 if (adaptor) {
548 return pthread_mutex_unlock(&adaptor->watchers_lock);
549 } else {
550 return 0;
551 }
552 }
553
554 int enter_critical(zhandle_t* zh)
555 {
556 struct adaptor_threads *adaptor = zh->adaptor_priv;
557 if (adaptor) {
558 return pthread_mutex_lock(&adaptor->zh_lock);
559 } else {
560 return 0;
561 }
562 }
563
564 int leave_critical(zhandle_t* zh)
565 {
566 struct adaptor_threads *adaptor = zh->adaptor_priv;
567 if (adaptor) {
568 return pthread_mutex_unlock(&adaptor->zh_lock);
569 } else {
570 return 0;
571 }
572 }
573