1 /**
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with this
4 * work for additional information regarding copyright ownership. The ASF
5 * licenses this file to you under the Apache License, Version 2.0 (the
6 * "License"); you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14 * License for the specific language governing permissions and limitations under
15 * the License.
16 */
17 #include "ZooKeeperQuorumServer.h"
18
19 #include <cassert>
20 #include <cstdio>
21 #include <cstdlib>
22 #include <fstream>
23 #include <sstream>
24 #include <vector>
25 #include <utility>
26 #include <unistd.h>
27
28 ZooKeeperQuorumServer::
ZooKeeperQuorumServer(uint32_t id,uint32_t numServers,std::string config,std::string env)29 ZooKeeperQuorumServer(uint32_t id, uint32_t numServers, std::string config, std::string env) :
30 id_(id),
31 env_(env),
32 numServers_(numServers) {
33 const char* root = getenv("ZKROOT");
34 if (root == NULL) {
35 assert(!"Environment variable 'ZKROOT' is not set");
36 }
37 root_ = root;
38 createConfigFile(config);
39 createDataDirectory();
40 start();
41 }
42
43 ZooKeeperQuorumServer::
~ZooKeeperQuorumServer()44 ~ZooKeeperQuorumServer() {
45 stop();
46 }
47
48 std::string ZooKeeperQuorumServer::
getHostPort()49 getHostPort() {
50 std::stringstream ss;
51 ss << "localhost:" << getClientPort();
52 return ss.str();
53 }
54
55 uint32_t ZooKeeperQuorumServer::
getClientPort()56 getClientPort() {
57 return CLIENT_PORT_BASE + id_;
58 }
59
60 void ZooKeeperQuorumServer::
start()61 start() {
62 std::string command = root_ + "/bin/zkServer.sh start " +
63 getConfigFileName();
64 if (!env_.empty()) {
65 command = env_ + " " + command;
66 }
67 assert(system(command.c_str()) == 0);
68 }
69
70 void ZooKeeperQuorumServer::
stop()71 stop() {
72 std::string command = root_ + "/bin/zkServer.sh stop " +
73 getConfigFileName();
74 assert(system(command.c_str()) == 0);
75 }
76
77 std::string ZooKeeperQuorumServer::
getMode()78 getMode() {
79 char buf[1024];
80 std::string result;
81 std::string command = root_ + "/bin/zkServer.sh status " +
82 getConfigFileName();
83 FILE* output = popen(command.c_str(), "r");
84 do {
85 if (fgets(buf, 1024, output) != NULL) {
86 result += buf;
87 }
88 } while (!feof(output));
89 pclose(output);
90 if (result.find("Mode: leader") != std::string::npos) {
91 return "leader";
92 } else if (result.find("Mode: follower") != std::string::npos) {
93 return "follower";
94 } else {
95 printf("%s\n", result.c_str());
96 return "";
97 }
98 }
99
100 bool ZooKeeperQuorumServer::
isLeader()101 isLeader() {
102 return getMode() == "leader";
103 }
104
105 bool ZooKeeperQuorumServer::
isFollower()106 isFollower() {
107 return getMode() == "follower";
108 }
109
110 void ZooKeeperQuorumServer::
createConfigFile(std::string config)111 createConfigFile(std::string config) {
112 std::string command = "mkdir -p " + root_ + "/build/test/test-cppunit/conf";
113 assert(system(command.c_str()) == 0);
114 std::ofstream confFile;
115 std::stringstream ss;
116 ss << id_ << ".conf";
117 std::string fileName = root_ + "/build/test/test-cppunit/conf/" + ss.str();
118 confFile.open(fileName.c_str());
119 confFile << "tickTime=2000\n";
120 confFile << "clientPort=" << getClientPort() << "\n";
121 confFile << "initLimit=5\n";
122 confFile << "syncLimit=2\n";
123 confFile << "dataDir=" << getDataDirectory() << "\n";
124 for (uint32_t i = 0; i < numServers_; i++) {
125 confFile << getServerString(i) << "\n";
126 }
127 // Append additional config, if any.
128 if (!config.empty()) {
129 confFile << config << std::endl;
130 }
131 confFile.close();
132 }
133
134 std::string ZooKeeperQuorumServer::
getConfigFileName()135 getConfigFileName() {
136 std::stringstream ss;
137 ss << id_ << ".conf";
138 return root_ + "/build/test/test-cppunit/conf/" + ss.str();
139 }
140
141 void ZooKeeperQuorumServer::
createDataDirectory()142 createDataDirectory() {
143 std::string dataDirectory = getDataDirectory();
144 std::string command = "rm -rf " + dataDirectory;
145 assert(system(command.c_str()) == 0);
146 command = "mkdir -p " + dataDirectory;
147 assert(system(command.c_str()) == 0);
148 std::ofstream myidFile;
149 std::string fileName = dataDirectory + "/myid";
150 myidFile.open(fileName.c_str());
151 myidFile << id_ << "\n";
152 myidFile.close();
153 setenv("ZOO_LOG_DIR", dataDirectory.c_str(), true);
154 }
155
156 std::string ZooKeeperQuorumServer::
getServerString()157 getServerString() {
158 return getServerString(id_);
159 }
160
161 std::string ZooKeeperQuorumServer::
getServerString(uint32_t id)162 getServerString(uint32_t id) {
163 std::stringstream ss;
164 ss << "server." << id << "=localhost:" << SERVER_PORT_BASE + id <<
165 ":" << ELECTION_PORT_BASE + id << ":participant;localhost:" <<
166 CLIENT_PORT_BASE + id;
167 return ss.str();
168 }
169
170 std::string ZooKeeperQuorumServer::
getDataDirectory()171 getDataDirectory() {
172 std::stringstream ss;
173 ss << "data" << id_;
174 return root_ + "/build/test/test-cppunit/" + ss.str();
175 }
176
177 std::vector<ZooKeeperQuorumServer*> ZooKeeperQuorumServer::
getCluster(uint32_t numServers)178 getCluster(uint32_t numServers) {
179 std::vector<ZooKeeperQuorumServer*> cluster;
180 for (uint32_t i = 0; i < numServers; i++) {
181 cluster.push_back(new ZooKeeperQuorumServer(i, numServers));
182 }
183
184 // Wait until all the servers start, and fail if they don't start within 10
185 // seconds.
186 for (uint32_t i = 0; i < 10; i++) {
187 uint32_t j = 0;
188 for (; j < cluster.size(); j++) {
189 if (cluster[j]->getMode() == "") {
190 // The server hasn't started.
191 sleep(1);
192 break;
193 }
194 }
195 if (j == cluster.size()) {
196 return cluster;
197 }
198 }
199 assert(!"The cluster didn't start for 10 seconds");
200 }
201
202 std::vector<ZooKeeperQuorumServer*> ZooKeeperQuorumServer::
getCluster(uint32_t numServers,ZooKeeperQuorumServer::tConfigPairs configs,std::string env)203 getCluster(uint32_t numServers, ZooKeeperQuorumServer::tConfigPairs configs, std::string env) {
204 std::vector<ZooKeeperQuorumServer*> cluster;
205 std::string config;
206 for (ZooKeeperQuorumServer::tConfigPairs::const_iterator iter = configs.begin(); iter != configs.end(); ++iter) {
207 std::pair<std::string, std::string> pair = *iter;
208 config += (pair.first + "=" + pair.second + "\n");
209 }
210 for (uint32_t i = 0; i < numServers; i++) {
211 cluster.push_back(new ZooKeeperQuorumServer(i, numServers, config, env));
212 }
213
214 // Wait until all the servers start, and fail if they don't start within 10
215 // seconds.
216 for (uint32_t i = 0; i < 10; i++) {
217 uint32_t j = 0;
218 for (; j < cluster.size(); j++) {
219 if (cluster[j]->getMode() == "") {
220 // The server hasn't started.
221 sleep(1);
222 break;
223 }
224 }
225 if (j == cluster.size()) {
226 return cluster;
227 }
228 }
229 assert(!"The cluster didn't start for 10 seconds");
230 }
231