1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 #ifndef THREADED
19 #define THREADED
20 #endif
21 
22 #if !defined(DLL_EXPORT) && !defined(USE_STATIC_LIB)
23 #  define USE_STATIC_LIB
24 #endif
25 
26 #ifndef _GNU_SOURCE
27 #define _GNU_SOURCE
28 #endif
29 
30 #include "zk_adaptor.h"
31 #include "zookeeper_log.h"
32 
33 #include <stdlib.h>
34 #include <stdio.h>
35 #include <time.h>
36 #include <fcntl.h>
37 #include <assert.h>
38 #include <errno.h>
39 
40 #ifndef WIN32
41 #include <signal.h>
42 #include <poll.h>
43 #include <unistd.h>
44 #include <sys/time.h>
45 #endif
46 
zoo_lock_auth(zhandle_t * zh)47 int zoo_lock_auth(zhandle_t *zh)
48 {
49     return pthread_mutex_lock(&zh->auth_h.lock);
50 }
zoo_unlock_auth(zhandle_t * zh)51 int zoo_unlock_auth(zhandle_t *zh)
52 {
53     return pthread_mutex_unlock(&zh->auth_h.lock);
54 }
lock_buffer_list(buffer_head_t * l)55 int lock_buffer_list(buffer_head_t *l)
56 {
57     return pthread_mutex_lock(&l->lock);
58 }
unlock_buffer_list(buffer_head_t * l)59 int unlock_buffer_list(buffer_head_t *l)
60 {
61     return pthread_mutex_unlock(&l->lock);
62 }
lock_completion_list(completion_head_t * l)63 int lock_completion_list(completion_head_t *l)
64 {
65     return pthread_mutex_lock(&l->lock);
66 }
unlock_completion_list(completion_head_t * l)67 int unlock_completion_list(completion_head_t *l)
68 {
69     pthread_cond_broadcast(&l->cond);
70     return pthread_mutex_unlock(&l->lock);
71 }
alloc_sync_completion(void)72 struct sync_completion *alloc_sync_completion(void)
73 {
74     struct sync_completion *sc = (struct sync_completion*)calloc(1, sizeof(struct sync_completion));
75     if (sc) {
76        pthread_cond_init(&sc->cond, 0);
77        pthread_mutex_init(&sc->lock, 0);
78     }
79     return sc;
80 }
wait_sync_completion(struct sync_completion * sc)81 int wait_sync_completion(struct sync_completion *sc)
82 {
83     pthread_mutex_lock(&sc->lock);
84     while (!sc->complete) {
85         pthread_cond_wait(&sc->cond, &sc->lock);
86     }
87     pthread_mutex_unlock(&sc->lock);
88     return 0;
89 }
90 
free_sync_completion(struct sync_completion * sc)91 void free_sync_completion(struct sync_completion *sc)
92 {
93     if (sc) {
94         pthread_mutex_destroy(&sc->lock);
95         pthread_cond_destroy(&sc->cond);
96         free(sc);
97     }
98 }
99 
notify_sync_completion(struct sync_completion * sc)100 void notify_sync_completion(struct sync_completion *sc)
101 {
102     pthread_mutex_lock(&sc->lock);
103     sc->complete = 1;
104     pthread_cond_broadcast(&sc->cond);
105     pthread_mutex_unlock(&sc->lock);
106 }
107 
process_async(int outstanding_sync)108 int process_async(int outstanding_sync)
109 {
110     return 0;
111 }
112 
113 #ifdef WIN32
114 unsigned __stdcall do_io( void * );
115 unsigned __stdcall do_completion( void * );
116 
handle_error(zhandle_t * zh,SOCKET sock,char * message)117 int handle_error(zhandle_t* zh, SOCKET sock, char* message)
118 {
119        LOG_ERROR(LOGCALLBACK(zh), "%s. %d",message, WSAGetLastError());
120        closesocket (sock);
121        return -1;
122 }
123 
124 //--create socket pair for interupting selects.
create_socket_pair(zhandle_t * zh,SOCKET fds[2])125 int create_socket_pair(zhandle_t* zh, SOCKET fds[2])
126 {
127     struct sockaddr_in inaddr;
128     struct sockaddr addr;
129     int yes=1;
130     int len=0;
131 
132     SOCKET lst=socket(AF_INET, SOCK_STREAM,IPPROTO_TCP);
133     if (lst ==  INVALID_SOCKET ){
134        LOG_ERROR(LOGCALLBACK(zh), "Error creating socket. %d",WSAGetLastError());
135        return -1;
136     }
137     memset(&inaddr, 0, sizeof(inaddr));
138     memset(&addr, 0, sizeof(addr));
139     inaddr.sin_family = AF_INET;
140     inaddr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
141     inaddr.sin_port = 0; //--system assigns the port
142 
143     if ( setsockopt(lst,SOL_SOCKET,SO_REUSEADDR,(char*)&yes,sizeof(yes)) == SOCKET_ERROR  ) {
144        return handle_error(zh, lst,"Error trying to set socket option.");
145     }
146     if (bind(lst,(struct sockaddr *)&inaddr,sizeof(inaddr)) == SOCKET_ERROR){
147        return handle_error(zh, lst,"Error trying to bind socket.");
148     }
149     if (listen(lst,1) == SOCKET_ERROR){
150        return handle_error(zh, lst,"Error trying to listen on socket.");
151     }
152     len=sizeof(inaddr);
153     getsockname(lst, &addr,&len);
154     fds[0]=socket(AF_INET, SOCK_STREAM,0);
155     if (connect(fds[0],&addr,len) == SOCKET_ERROR){
156        return handle_error(zh, lst, "Error while connecting to socket.");
157     }
158     if ((fds[1]=accept(lst,0,0)) == INVALID_SOCKET){
159        closesocket(fds[0]);
160        return handle_error(zh, lst, "Error while accepting socket connection.");
161     }
162     closesocket(lst);
163     return 0;
164 }
165 #else
166 void *do_io(void *);
167 void *do_completion(void *);
168 #endif
169 
170 
171 int wakeup_io_thread(zhandle_t *zh);
172 
173 #ifdef WIN32
set_nonblock(SOCKET fd)174 static int set_nonblock(SOCKET fd){
175     ULONG nonblocking_flag = 1;
176     if (ioctlsocket(fd, FIONBIO, &nonblocking_flag) == 0)
177         return 1;
178     else
179         return -1;
180 }
181 #else
set_nonblock(int fd)182 static int set_nonblock(int fd){
183     long l = fcntl(fd, F_GETFL);
184     if(l & O_NONBLOCK) return 0;
185     return fcntl(fd, F_SETFL, l | O_NONBLOCK);
186 }
187 #endif
188 
wait_for_others(zhandle_t * zh)189 void wait_for_others(zhandle_t* zh)
190 {
191     struct adaptor_threads* adaptor=zh->adaptor_priv;
192     pthread_mutex_lock(&adaptor->lock);
193     while(adaptor->threadsToWait>0)
194         pthread_cond_wait(&adaptor->cond,&adaptor->lock);
195     pthread_mutex_unlock(&adaptor->lock);
196 }
197 
notify_thread_ready(zhandle_t * zh)198 void notify_thread_ready(zhandle_t* zh)
199 {
200     struct adaptor_threads* adaptor=zh->adaptor_priv;
201     pthread_mutex_lock(&adaptor->lock);
202     adaptor->threadsToWait--;
203     pthread_cond_broadcast(&adaptor->cond);
204     while(adaptor->threadsToWait>0)
205         pthread_cond_wait(&adaptor->cond,&adaptor->lock);
206     pthread_mutex_unlock(&adaptor->lock);
207 }
208 
209 
start_threads(zhandle_t * zh)210 void start_threads(zhandle_t* zh)
211 {
212     int rc = 0;
213     struct adaptor_threads* adaptor=zh->adaptor_priv;
214     pthread_cond_init(&adaptor->cond,0);
215     pthread_mutex_init(&adaptor->lock,0);
216     adaptor->threadsToWait=2;  // wait for 2 threads before opening the barrier
217 
218     // use api_prolog() to make sure zhandle doesn't get destroyed
219     // while initialization is in progress
220     api_prolog(zh);
221     LOG_DEBUG(LOGCALLBACK(zh), "starting threads...");
222     rc=pthread_create(&adaptor->io, 0, do_io, zh);
223     assert("pthread_create() failed for the IO thread"&&!rc);
224     rc=pthread_create(&adaptor->completion, 0, do_completion, zh);
225     assert("pthread_create() failed for the completion thread"&&!rc);
226     wait_for_others(zh);
227     api_epilog(zh, 0);
228 }
229 
adaptor_init(zhandle_t * zh)230 int adaptor_init(zhandle_t *zh)
231 {
232     pthread_mutexattr_t recursive_mx_attr;
233     struct adaptor_threads *adaptor_threads = calloc(1, sizeof(*adaptor_threads));
234     if (!adaptor_threads) {
235         LOG_ERROR(LOGCALLBACK(zh), "Out of memory");
236         return -1;
237     }
238 
239     /* We use a pipe for interrupting select() in unix/sol and socketpair in windows. */
240 #ifdef WIN32
241     if (create_socket_pair(zh, adaptor_threads->self_pipe) == -1){
242        LOG_ERROR(LOGCALLBACK(zh), "Can't make a socket.");
243 #else
244     if(pipe(adaptor_threads->self_pipe)==-1) {
245         LOG_ERROR(LOGCALLBACK(zh), "Can't make a pipe %d",errno);
246 #endif
247         free(adaptor_threads);
248         return -1;
249     }
250     set_nonblock(adaptor_threads->self_pipe[1]);
251     set_nonblock(adaptor_threads->self_pipe[0]);
252 
253     pthread_mutex_init(&zh->auth_h.lock,0);
254 
255     zh->adaptor_priv = adaptor_threads;
256     pthread_mutex_init(&zh->to_process.lock,0);
257     pthread_mutex_init(&adaptor_threads->zh_lock,0);
258     pthread_mutex_init(&adaptor_threads->reconfig_lock,0);
259     pthread_mutex_init(&adaptor_threads->watchers_lock,0);
260     // to_send must be recursive mutex
261     pthread_mutexattr_init(&recursive_mx_attr);
262     pthread_mutexattr_settype(&recursive_mx_attr, PTHREAD_MUTEX_RECURSIVE);
263     pthread_mutex_init(&zh->to_send.lock,&recursive_mx_attr);
264     pthread_mutexattr_destroy(&recursive_mx_attr);
265 
266     pthread_mutex_init(&zh->sent_requests.lock,0);
267     pthread_cond_init(&zh->sent_requests.cond,0);
268     pthread_mutex_init(&zh->completions_to_process.lock,0);
269     pthread_cond_init(&zh->completions_to_process.cond,0);
270     start_threads(zh);
271     return 0;
272 }
273 
274 void adaptor_finish(zhandle_t *zh)
275 {
276     struct adaptor_threads *adaptor_threads;
277     // make sure zh doesn't get destroyed until after we're done here
278     api_prolog(zh);
279     adaptor_threads = zh->adaptor_priv;
280     if(adaptor_threads==0) {
281         api_epilog(zh,0);
282         return;
283     }
284 
285     if(!pthread_equal(adaptor_threads->io,pthread_self())){
286         wakeup_io_thread(zh);
287         pthread_join(adaptor_threads->io, 0);
288     }else
289         pthread_detach(adaptor_threads->io);
290 
291     if(!pthread_equal(adaptor_threads->completion,pthread_self())){
292         pthread_mutex_lock(&zh->completions_to_process.lock);
293         pthread_cond_broadcast(&zh->completions_to_process.cond);
294         pthread_mutex_unlock(&zh->completions_to_process.lock);
295         pthread_join(adaptor_threads->completion, 0);
296     }else
297         pthread_detach(adaptor_threads->completion);
298 
299     api_epilog(zh,0);
300 }
301 
302 void adaptor_destroy(zhandle_t *zh)
303 {
304     struct adaptor_threads *adaptor = zh->adaptor_priv;
305     if(adaptor==0) return;
306 
307     pthread_cond_destroy(&adaptor->cond);
308     pthread_mutex_destroy(&adaptor->lock);
309     pthread_mutex_destroy(&zh->to_process.lock);
310     pthread_mutex_destroy(&zh->to_send.lock);
311     pthread_mutex_destroy(&zh->sent_requests.lock);
312     pthread_cond_destroy(&zh->sent_requests.cond);
313     pthread_mutex_destroy(&zh->completions_to_process.lock);
314     pthread_cond_destroy(&zh->completions_to_process.cond);
315     pthread_mutex_destroy(&adaptor->zh_lock);
316 
317     pthread_mutex_destroy(&zh->auth_h.lock);
318 
319     close(adaptor->self_pipe[0]);
320     close(adaptor->self_pipe[1]);
321     free(adaptor);
322     zh->adaptor_priv=0;
323 }
324 
325 int wakeup_io_thread(zhandle_t *zh)
326 {
327     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
328     char c=0;
329 #ifndef WIN32
330     return write(adaptor_threads->self_pipe[1],&c,1)==1? ZOK: ZSYSTEMERROR;
331 #else
332     return send(adaptor_threads->self_pipe[1], &c, 1, 0)==1? ZOK: ZSYSTEMERROR;
333 #endif
334 }
335 
336 int adaptor_send_queue(zhandle_t *zh, int timeout)
337 {
338     if(!zh->close_requested)
339         return wakeup_io_thread(zh);
340     // don't rely on the IO thread to send the messages if the app has
341     // requested to close
342     return flush_send_queue(zh, timeout);
343 }
344 
345 /* These two are declared here because we will run the event loop
346  * and not the client */
347 #ifdef WIN32
348 int zookeeper_interest(zhandle_t *zh, SOCKET *fd, int *interest,
349         struct timeval *tv);
350 #else
351 int zookeeper_interest(zhandle_t *zh, int *fd, int *interest,
352         struct timeval *tv);
353 #endif
354 int zookeeper_process(zhandle_t *zh, int events);
355 
356 #ifdef WIN32
357 unsigned __stdcall do_io( void * v)
358 #else
359 void *do_io(void *v)
360 #endif
361 {
362     zhandle_t *zh = (zhandle_t*)v;
363 #ifndef WIN32
364     struct pollfd fds[2];
365     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
366 
367     api_prolog(zh);
368     notify_thread_ready(zh);
369     LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
370     fds[0].fd=adaptor_threads->self_pipe[0];
371     fds[0].events=POLLIN;
372     while(!zh->close_requested) {
373         struct timeval tv;
374         int fd;
375         int interest;
376         int timeout;
377         int maxfd=1;
378 
379         zh->io_count++;
380 
381         zookeeper_interest(zh, &fd, &interest, &tv);
382         if (fd != -1) {
383             fds[1].fd=fd;
384             fds[1].events=(interest&ZOOKEEPER_READ)?POLLIN:0;
385             fds[1].events|=(interest&ZOOKEEPER_WRITE)?POLLOUT:0;
386             maxfd=2;
387         }
388         timeout=tv.tv_sec * 1000 + (tv.tv_usec/1000);
389 
390         poll(fds,maxfd,timeout);
391         if (fd != -1) {
392             interest=(fds[1].revents&POLLIN)?ZOOKEEPER_READ:0;
393             interest|=((fds[1].revents&POLLOUT)||(fds[1].revents&POLLHUP))?ZOOKEEPER_WRITE:0;
394         }
395         if(fds[0].revents&POLLIN){
396             // flush the pipe
397             char b[128];
398             while(read(adaptor_threads->self_pipe[0],b,sizeof(b))==sizeof(b)){}
399         }
400 #else
401     fd_set rfds, wfds;
402     struct adaptor_threads *adaptor_threads = zh->adaptor_priv;
403     api_prolog(zh);
404     notify_thread_ready(zh);
405     LOG_DEBUG(LOGCALLBACK(zh), "started IO thread");
406 
407     while(!zh->close_requested) {
408         struct timeval tv;
409         SOCKET fd;
410         int interest;
411         int rc;
412 
413         zookeeper_interest(zh, &fd, &interest, &tv);
414 
415         // FD_ZERO is cheap on Win32, it just sets count of elements to zero.
416         // It needs to be done to ensure no stale entries.
417         FD_ZERO(&rfds);
418         FD_ZERO(&wfds);
419 
420         if (fd != -1) {
421             if (interest&ZOOKEEPER_READ) {
422                 FD_SET(fd, &rfds);
423             }
424 
425             if (interest&ZOOKEEPER_WRITE) {
426                 FD_SET(fd, &wfds);
427             }
428         }
429 
430         // Always interested in self_pipe.
431         FD_SET(adaptor_threads->self_pipe[0], &rfds);
432 
433         rc = select(/* unused */0, &rfds, &wfds, NULL, &tv);
434         if (rc > 0) {
435             interest=(FD_ISSET(fd, &rfds))? ZOOKEEPER_READ: 0;
436             interest|=(FD_ISSET(fd, &wfds))? ZOOKEEPER_WRITE: 0;
437 
438             if (FD_ISSET(adaptor_threads->self_pipe[0], &rfds)){
439                 // flush the pipe/socket
440                 char b[128];
441                 while(recv(adaptor_threads->self_pipe[0],b,sizeof(b), 0)==sizeof(b)){}
442             }
443         }
444         else if (rc < 0) {
445             LOG_ERROR(LOGCALLBACK(zh), ("select() failed %d [%d].", rc, WSAGetLastError()));
446 
447             // Clear interest events for zookeeper_process if select() fails.
448             interest = 0;
449         }
450 
451 #endif
452         // dispatch zookeeper events
453         zookeeper_process(zh, interest);
454         // check the current state of the zhandle and terminate
455         // if it is_unrecoverable()
456         if(is_unrecoverable(zh))
457             break;
458     }
459     api_epilog(zh, 0);
460     LOG_DEBUG(LOGCALLBACK(zh), "IO thread terminated");
461     return 0;
462 }
463 
464 #ifdef WIN32
465 unsigned __stdcall do_completion( void * v)
466 #else
467 void *do_completion(void *v)
468 #endif
469 {
470     zhandle_t *zh = v;
471     api_prolog(zh);
472     notify_thread_ready(zh);
473     LOG_DEBUG(LOGCALLBACK(zh), "started completion thread");
474     while(!zh->close_requested) {
475         pthread_mutex_lock(&zh->completions_to_process.lock);
476         while(!zh->completions_to_process.head && !zh->close_requested) {
477             pthread_cond_wait(&zh->completions_to_process.cond, &zh->completions_to_process.lock);
478         }
479         pthread_mutex_unlock(&zh->completions_to_process.lock);
480         process_completions(zh);
481     }
482     api_epilog(zh, 0);
483     LOG_DEBUG(LOGCALLBACK(zh), "completion thread terminated");
484     return 0;
485 }
486 
487 int32_t inc_ref_counter(zhandle_t* zh,int i)
488 {
489     int incr=(i<0?-1:(i>0?1:0));
490     // fetch_and_add implements atomic post-increment
491     int v=fetch_and_add(&zh->ref_counter,incr);
492     // inc_ref_counter wants pre-increment
493     v+=incr;   // simulate pre-increment
494     return v;
495 }
496 
497 int32_t fetch_and_add(volatile int32_t* operand, int incr)
498 {
499 #ifndef WIN32
500     return __sync_fetch_and_add(operand, incr);
501 #else
502     return InterlockedExchangeAdd(operand, incr);
503 #endif
504 }
505 
506 // make sure the static xid is initialized before any threads started
507 __attribute__((constructor)) int32_t get_xid()
508 {
509     static int32_t xid = -1;
510     if (xid == -1) {
511         xid = time(0);
512     }
513     return fetch_and_add(&xid,1);
514 }
515 
516 int lock_reconfig(struct _zhandle *zh)
517 {
518     struct adaptor_threads *adaptor = zh->adaptor_priv;
519     if (adaptor) {
520         return pthread_mutex_lock(&adaptor->reconfig_lock);
521     } else {
522         return 0;
523     }
524 }
525 int unlock_reconfig(struct _zhandle *zh)
526 {
527     struct adaptor_threads *adaptor = zh->adaptor_priv;
528     if (adaptor) {
529         return pthread_mutex_unlock(&adaptor->reconfig_lock);
530     } else {
531         return 0;
532     }
533 }
534 
535 int lock_watchers(struct _zhandle *zh)
536 {
537     struct adaptor_threads *adaptor = zh->adaptor_priv;
538     if (adaptor) {
539         return pthread_mutex_lock(&adaptor->watchers_lock);
540     } else {
541         return 0;
542     }
543 }
544 int unlock_watchers(struct _zhandle *zh)
545 {
546     struct adaptor_threads *adaptor = zh->adaptor_priv;
547     if (adaptor) {
548         return pthread_mutex_unlock(&adaptor->watchers_lock);
549     } else {
550         return 0;
551     }
552 }
553 
554 int enter_critical(zhandle_t* zh)
555 {
556     struct adaptor_threads *adaptor = zh->adaptor_priv;
557     if (adaptor) {
558         return pthread_mutex_lock(&adaptor->zh_lock);
559     } else {
560         return 0;
561     }
562 }
563 
564 int leave_critical(zhandle_t* zh)
565 {
566     struct adaptor_threads *adaptor = zh->adaptor_priv;
567     if (adaptor) {
568         return pthread_mutex_unlock(&adaptor->zh_lock);
569     } else {
570         return 0;
571     }
572 }
573