1 /* vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2 *
3 * Gearmand client and server library.
4 *
5 * Copyright (C) 2011 Data Differential, http://datadifferential.com/
6 * Copyright (C) 2008 Brian Aker, Eric Day
7 * All rights reserved.
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions are
11 * met:
12 *
13 * * Redistributions of source code must retain the above copyright
14 * notice, this list of conditions and the following disclaimer.
15 *
16 * * Redistributions in binary form must reproduce the above
17 * copyright notice, this list of conditions and the following disclaimer
18 * in the documentation and/or other materials provided with the
19 * distribution.
20 *
21 * * The names of its contributors may not be used to endorse or
22 * promote products derived from this software without specific prior
23 * written permission.
24 *
25 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28 * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29 * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32 * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33 * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36 *
37 */
38
39
40 /**
41 * @file
42 * @brief Blob slap worker utility
43 */
44
45 #include "gear_config.h"
46 #include <benchmark/benchmark.h>
47 #include <boost/program_options.hpp>
48 #include <cerrno>
49 #include <cstdio>
50 #include <climits>
51 #include <iostream>
52 #include <vector>
53 #include "util/daemon.hpp"
54 #include "util/logfile.hpp"
55 #include "util/pidfile.hpp"
56 #include "util/signal.hpp"
57 #include "util/string.hpp"
58
59 using namespace datadifferential;
60
61 static void *worker_fn(gearman_job_st *job, void *context,
62 size_t *result_size, gearman_return_t *ret_ptr);
63
shutdown_fn(gearman_job_st *,void *)64 static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
65 {
66 return GEARMAN_SHUTDOWN;
67 }
68
ping_fn(gearman_job_st *,void *)69 static gearman_return_t ping_fn(gearman_job_st*, void* /* context */)
70 {
71 return GEARMAN_SUCCESS;
72 }
73
74
main(int args,char * argv[])75 int main(int args, char *argv[])
76 {
77 gearman_benchmark_st benchmark;
78 bool opt_daemon;
79 bool opt_chunk;
80 bool opt_status;
81 bool opt_unique;
82 std::string pid_file;
83 std::string log_file;
84 int32_t timeout;
85 uint32_t count= UINT_MAX;
86 in_port_t port;
87 std::string host;
88 std::vector<std::string> functions;
89 std::string verbose_string;
90 boost::program_options::options_description desc("Options");
91 desc.add_options()
92 ("help", "Options related to the program.")
93 ("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
94 ("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
95 ("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
96 ("timeout,u", boost::program_options::value<int32_t>(&timeout)->default_value(-1), "Timeout in milliseconds")
97 ("chunk", boost::program_options::bool_switch(&opt_chunk)->default_value(false), "Send result back in data chunks")
98 ("status,s", boost::program_options::bool_switch(&opt_status)->default_value(false), "Send status updates and sleep while running job")
99 ("unique,u", boost::program_options::bool_switch(&opt_unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
100 ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false), "Daemonize")
101 ("function,f", boost::program_options::value(&functions), "Function to use.")
102 ("verbose,v", boost::program_options::value(&verbose_string)->default_value("v"), "Increase verbosity level by one.")
103 ("pid-file", boost::program_options::value(&pid_file), "File to write process ID out to.")
104 ("log-file", boost::program_options::value(&log_file), "Create a log file.")
105 ;
106
107 boost::program_options::variables_map vm;
108 try
109 {
110 boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
111 boost::program_options::notify(vm);
112 }
113 catch(std::exception &e)
114 {
115 std::cout << e.what() << std::endl;
116 return EXIT_FAILURE;
117 }
118
119 if (vm.count("help"))
120 {
121 std::cout << desc << std::endl;
122 return EXIT_SUCCESS;
123 }
124
125 if (opt_daemon)
126 {
127 util::daemonize(false, true);
128 }
129
130 util::Pidfile _pid_file(pid_file);
131
132 if (pid_file.empty() == false)
133 {
134 if (_pid_file.create() == false)
135 {
136 std::cerr << _pid_file.error_message().c_str();
137 return EXIT_FAILURE;
138 }
139 }
140
141 if (not log_file.empty())
142 {
143 FILE *file= fopen(log_file.c_str(), "w+");
144 if (file == NULL)
145 {
146 std::cerr << "Unable to open:" << log_file << "(" << strerror(errno) << ")" << std::endl;
147 return EXIT_FAILURE;
148 }
149 fclose(file);
150
151 // We let the error from this happen later (if one was to occur)
152 unlink(log_file.c_str());
153 }
154
155 gearman_worker_st *worker;
156 if (not (worker= gearman_worker_create(NULL)))
157 {
158 std::cerr << "Failed to allocate worker" << std::endl;
159 return EXIT_FAILURE;
160 }
161
162 if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
163 {
164 std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(worker) << std::endl;
165 return EXIT_FAILURE;
166 }
167
168 benchmark.verbose= static_cast<uint8_t>(verbose_string.length());
169
170 if (opt_daemon)
171 {
172 util::daemon_is_ready(benchmark.verbose == 0);
173 }
174
175 util::SignalThread signal(true);
176 util::Logfile log(log_file);
177
178 if (not log.open())
179 {
180 std::cerr << "Could not open logfile:" << log_file << std::endl;
181 return EXIT_FAILURE;
182 }
183
184 if (not signal.setup())
185 {
186 log.log() << "Failed signal.setup()" << std::endl;
187 return EXIT_FAILURE;
188 }
189
190 gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
191 if (gearman_failed(gearman_worker_define_function(worker,
192 util_literal_param("shutdown"),
193 shutdown_function,
194 0, 0)))
195 {
196 log.log() << "Failed to add shutdown function: " << gearman_worker_error(worker) << std::endl;
197 return EXIT_FAILURE;
198 }
199
200 gearman_function_t ping_function= gearman_function_create(ping_fn);
201 if (gearman_failed(gearman_worker_define_function(worker,
202 util_literal_param("blobslap_worker_ping"),
203 ping_function,
204 0, 0)))
205 {
206 log.log() << "Failed to add blobslap_worker_ping function: " << gearman_worker_error(worker) << std::endl;
207 return EXIT_FAILURE;
208 }
209
210 if (functions.empty() == false)
211 {
212 for (std::vector<std::string>::iterator iter= functions.begin(); iter != functions.end(); ++iter)
213 {
214 if (gearman_failed(gearman_worker_add_function(worker,
215 (*iter).c_str(), 0,
216 worker_fn, &benchmark)))
217 {
218 log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
219 return EXIT_FAILURE;
220 }
221 }
222 }
223 else
224 {
225 if (gearman_failed(gearman_worker_add_function(worker,
226 GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
227 worker_fn, &benchmark)))
228 {
229 log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
230 return EXIT_FAILURE;
231 }
232 }
233
234 gearman_worker_set_timeout(worker, timeout);
235
236 do
237 {
238 gearman_return_t rc= gearman_worker_work(worker);
239
240 if (rc == GEARMAN_SHUTDOWN)
241 {
242 if (benchmark.verbose > 0)
243 {
244 log.log() << "shutdown" << std::endl;
245 }
246 break;
247 }
248 else if (gearman_failed(rc))
249 {
250 log.log() << "gearman_worker_work(): " << gearman_worker_error(worker) << std::endl;
251 break;
252 }
253
254 count--;
255 } while(count and (not signal.is_shutdown()));
256
257 gearman_worker_free(worker);
258
259 return EXIT_SUCCESS;
260 }
261
worker_fn(gearman_job_st * job,void * context,size_t *,gearman_return_t * ret_ptr)262 static void *worker_fn(gearman_job_st *job, void *context,
263 size_t *, gearman_return_t *ret_ptr)
264 {
265 gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(context);
266
267 if (benchmark->verbose > 0)
268 {
269 benchmark_check_time(benchmark);
270 }
271
272 if (benchmark->verbose > 1)
273 {
274 std::cout << "Job=%s (" << gearman_job_workload_size(job) << ")" << std::endl;
275 }
276
277 *ret_ptr= GEARMAN_SUCCESS;
278
279 return NULL;
280 }
281