1 /**
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements.  See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership.  The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License.  You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 
19 #include <cppunit/extensions/HelperMacros.h>
20 #include "CppAssertHelper.h"
21 
22 #include <signal.h>
23 #include <stdlib.h>
24 #include <unistd.h>
25 
26 #include "Vector.h"
27 using namespace std;
28 
29 #include <zookeeper.h>
30 
31 #include "Util.h"
32 #include "WatchUtil.h"
33 
34 class Zookeeper_clientretry : public CPPUNIT_NS::TestFixture
35 {
36     CPPUNIT_TEST_SUITE(Zookeeper_clientretry);
37 #ifdef THREADED
38     CPPUNIT_TEST(testRetry);
39 #endif
40     CPPUNIT_TEST_SUITE_END();
41 
watcher(zhandle_t *,int type,int state,const char * path,void * v)42     static void watcher(zhandle_t *, int type, int state, const char *path,void*v){
43         watchctx_t *ctx = (watchctx_t*)v;
44 
45         if (state == ZOO_CONNECTED_STATE) {
46             ctx->connected = true;
47         } else {
48             ctx->connected = false;
49         }
50         if (type != ZOO_SESSION_EVENT) {
51             evt_t evt;
52             evt.path = path;
53             evt.type = type;
54             ctx->putEvent(evt);
55         }
56     }
57 
58     static const char hostPorts[];
59 
getHostPorts()60     const char *getHostPorts() {
61         return hostPorts;
62     }
63 
createClient(watchctx_t * ctx)64     zhandle_t *createClient(watchctx_t *ctx) {
65         zhandle_t *zk = zookeeper_init(hostPorts, watcher, 10000, 0,
66                                        ctx, 0);
67         ctx->zh = zk;
68         sleep(1);
69         return zk;
70     }
71 
72     FILE *logfile;
73 public:
74 
Zookeeper_clientretry()75     Zookeeper_clientretry() {
76       logfile = openlogfile("Zookeeper_clientretry");
77     }
78 
~Zookeeper_clientretry()79     ~Zookeeper_clientretry() {
80       if (logfile) {
81         fflush(logfile);
82         fclose(logfile);
83         logfile = 0;
84       }
85     }
86 
setUp()87     void setUp()
88     {
89         zoo_set_log_stream(logfile);
90 
91         char cmd[1024];
92         sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
93         CPPUNIT_ASSERT(system(cmd) == 0);
94 
95         /* we are testing that if max cnxns is exceeded the server does the right thing */
96         sprintf(cmd, "ZKMAXCNXNS=1 %s startClean %s", ZKSERVER_CMD, getHostPorts());
97         CPPUNIT_ASSERT(system(cmd) == 0);
98 
99         struct sigaction act;
100         act.sa_handler = SIG_IGN;
101         sigemptyset(&act.sa_mask);
102         act.sa_flags = 0;
103         CPPUNIT_ASSERT(sigaction(SIGPIPE, &act, NULL) == 0);
104     }
105 
tearDown()106     void tearDown()
107     {
108         char cmd[1024];
109         sprintf(cmd, "%s stop %s", ZKSERVER_CMD, getHostPorts());
110         CPPUNIT_ASSERT(system(cmd) == 0);
111 
112         /* restart the server in "normal" mode */
113         sprintf(cmd, "%s startClean %s", ZKSERVER_CMD, getHostPorts());
114         CPPUNIT_ASSERT(system(cmd) == 0);
115 
116         struct sigaction act;
117         act.sa_handler = SIG_IGN;
118         sigemptyset(&act.sa_mask);
119         act.sa_flags = 0;
120         CPPUNIT_ASSERT(sigaction(SIGPIPE, &act, NULL) == 0);
121     }
122 
waitForEvent(zhandle_t * zh,watchctx_t * ctx,int seconds)123     bool waitForEvent(zhandle_t *zh, watchctx_t *ctx, int seconds) {
124         time_t expires = time(0) + seconds;
125         while(ctx->countEvents() == 0 && time(0) < expires) {
126             yield(zh, 1);
127         }
128         return ctx->countEvents() > 0;
129     }
130 
131     static zhandle_t *async_zk;
132 
testRetry()133     void testRetry()
134     {
135       watchctx_t ctx1, ctx2;
136       zhandle_t *zk1 = createClient(&ctx1);
137       CPPUNIT_ASSERT_EQUAL(true, ctx1.waitForConnected(zk1));
138       zhandle_t *zk2 = createClient(&ctx2);
139       zookeeper_close(zk1);
140       CPPUNIT_ASSERT_EQUAL(true, ctx2.waitForConnected(zk2));
141       ctx1.zh = 0;
142     }
143 };
144 
145 zhandle_t *Zookeeper_clientretry::async_zk;
146 const char Zookeeper_clientretry::hostPorts[] = "127.0.0.1:22181";
147 CPPUNIT_TEST_SUITE_REGISTRATION(Zookeeper_clientretry);
148