1 /*  vim:expandtab:shiftwidth=2:tabstop=2:smarttab:
2  *
3  *  Gearmand client and server library.
4  *
5  *  Copyright (C) 2011 Data Differential, http://datadifferential.com/
6  *  Copyright (C) 2008 Brian Aker, Eric Day
7  *  All rights reserved.
8  *
9  *  Redistribution and use in source and binary forms, with or without
10  *  modification, are permitted provided that the following conditions are
11  *  met:
12  *
13  *      * Redistributions of source code must retain the above copyright
14  *  notice, this list of conditions and the following disclaimer.
15  *
16  *      * Redistributions in binary form must reproduce the above
17  *  copyright notice, this list of conditions and the following disclaimer
18  *  in the documentation and/or other materials provided with the
19  *  distribution.
20  *
21  *      * The names of its contributors may not be used to endorse or
22  *  promote products derived from this software without specific prior
23  *  written permission.
24  *
25  *  THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
26  *  "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
27  *  LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
28  *  A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
29  *  OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
30  *  SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
31  *  LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
32  *  DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
33  *  THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
34  *  (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
35  *  OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
36  *
37  */
38 
39 
40 /**
41  * @file
42  * @brief Blob slap worker utility
43  */
44 
45 #include "gear_config.h"
46 #include <benchmark/benchmark.h>
47 #include <boost/program_options.hpp>
48 #include <cerrno>
49 #include <cstdio>
50 #include <climits>
51 #include <iostream>
52 #include <vector>
53 #include "util/daemon.hpp"
54 #include "util/logfile.hpp"
55 #include "util/pidfile.hpp"
56 #include "util/signal.hpp"
57 #include "util/string.hpp"
58 
59 using namespace datadifferential;
60 
61 static void *worker_fn(gearman_job_st *job, void *context,
62                        size_t *result_size, gearman_return_t *ret_ptr);
63 
shutdown_fn(gearman_job_st *,void *)64 static gearman_return_t shutdown_fn(gearman_job_st*, void* /* context */)
65 {
66   return GEARMAN_SHUTDOWN;
67 }
68 
ping_fn(gearman_job_st *,void *)69 static gearman_return_t ping_fn(gearman_job_st*, void* /* context */)
70 {
71   return GEARMAN_SUCCESS;
72 }
73 
74 
main(int args,char * argv[])75 int main(int args, char *argv[])
76 {
77   gearman_benchmark_st benchmark;
78   bool opt_daemon;
79   bool opt_chunk;
80   bool opt_status;
81   bool opt_unique;
82   std::string pid_file;
83   std::string log_file;
84   int32_t timeout;
85   uint32_t count= UINT_MAX;
86   in_port_t port;
87   std::string host;
88   std::vector<std::string> functions;
89   std::string verbose_string;
90   boost::program_options::options_description desc("Options");
91   desc.add_options()
92     ("help", "Options related to the program.")
93     ("host,h", boost::program_options::value<std::string>(&host)->default_value("localhost"),"Connect to the host")
94     ("port,p", boost::program_options::value<in_port_t>(&port)->default_value(GEARMAN_DEFAULT_TCP_PORT), "Port number use for connection")
95     ("count,c", boost::program_options::value<uint32_t>(&count)->default_value(0), "Number of jobs to run before exiting")
96     ("timeout,u", boost::program_options::value<int32_t>(&timeout)->default_value(-1), "Timeout in milliseconds")
97     ("chunk", boost::program_options::bool_switch(&opt_chunk)->default_value(false), "Send result back in data chunks")
98     ("status,s", boost::program_options::bool_switch(&opt_status)->default_value(false), "Send status updates and sleep while running job")
99     ("unique,u", boost::program_options::bool_switch(&opt_unique)->default_value(false), "When grabbing jobs, grab the uniqie id")
100     ("daemon,d", boost::program_options::bool_switch(&opt_daemon)->default_value(false), "Daemonize")
101     ("function,f", boost::program_options::value(&functions), "Function to use.")
102     ("verbose,v", boost::program_options::value(&verbose_string)->default_value("v"), "Increase verbosity level by one.")
103     ("pid-file", boost::program_options::value(&pid_file), "File to write process ID out to.")
104     ("log-file", boost::program_options::value(&log_file), "Create a log file.")
105             ;
106 
107   boost::program_options::variables_map vm;
108   try
109   {
110     boost::program_options::store(boost::program_options::parse_command_line(args, argv, desc), vm);
111     boost::program_options::notify(vm);
112   }
113   catch(std::exception &e)
114   {
115     std::cout << e.what() << std::endl;
116     return EXIT_FAILURE;
117   }
118 
119   if (vm.count("help"))
120   {
121     std::cout << desc << std::endl;
122     return EXIT_SUCCESS;
123   }
124 
125   if (opt_daemon)
126   {
127     util::daemonize(false, true);
128   }
129 
130   util::Pidfile _pid_file(pid_file);
131 
132   if (pid_file.empty() == false)
133   {
134     if (_pid_file.create() == false)
135     {
136       std::cerr << _pid_file.error_message().c_str();
137       return EXIT_FAILURE;
138     }
139   }
140 
141   if (not log_file.empty())
142   {
143     FILE *file= fopen(log_file.c_str(), "w+");
144     if (file == NULL)
145     {
146       std::cerr << "Unable to open:" << log_file << "(" << strerror(errno) << ")" << std::endl;
147       return EXIT_FAILURE;
148     }
149     fclose(file);
150 
151     // We let the error from this happen later (if one was to occur)
152     unlink(log_file.c_str());
153   }
154 
155   gearman_worker_st *worker;
156   if (not (worker= gearman_worker_create(NULL)))
157   {
158     std::cerr << "Failed to allocate worker" << std::endl;
159     return EXIT_FAILURE;
160   }
161 
162   if (gearman_failed(gearman_worker_add_server(worker, host.c_str(), port)))
163   {
164     std::cerr << "Failed while adding server " << host << ":" << port << " :" << gearman_worker_error(worker) << std::endl;
165     return EXIT_FAILURE;
166   }
167 
168   benchmark.verbose= static_cast<uint8_t>(verbose_string.length());
169 
170   if (opt_daemon)
171   {
172     util::daemon_is_ready(benchmark.verbose == 0);
173   }
174 
175   util::SignalThread signal(true);
176   util::Logfile log(log_file);
177 
178   if (not log.open())
179   {
180     std::cerr << "Could not open logfile:" << log_file << std::endl;
181     return EXIT_FAILURE;
182   }
183 
184   if (not signal.setup())
185   {
186     log.log() << "Failed signal.setup()" << std::endl;
187     return EXIT_FAILURE;
188   }
189 
190   gearman_function_t shutdown_function= gearman_function_create(shutdown_fn);
191   if (gearman_failed(gearman_worker_define_function(worker,
192                                                     util_literal_param("shutdown"),
193                                                     shutdown_function,
194                                                     0, 0)))
195   {
196     log.log() << "Failed to add shutdown function: " << gearman_worker_error(worker) << std::endl;
197     return EXIT_FAILURE;
198   }
199 
200   gearman_function_t ping_function= gearman_function_create(ping_fn);
201   if (gearman_failed(gearman_worker_define_function(worker,
202                                                     util_literal_param("blobslap_worker_ping"),
203                                                     ping_function,
204                                                     0, 0)))
205   {
206     log.log() << "Failed to add blobslap_worker_ping function: " << gearman_worker_error(worker) << std::endl;
207     return EXIT_FAILURE;
208   }
209 
210   if (functions.empty() == false)
211   {
212     for (std::vector<std::string>::iterator iter= functions.begin(); iter != functions.end(); ++iter)
213     {
214       if (gearman_failed(gearman_worker_add_function(worker,
215                                                      (*iter).c_str(), 0,
216                                                      worker_fn, &benchmark)))
217       {
218         log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
219         return EXIT_FAILURE;
220       }
221     }
222   }
223   else
224   {
225     if (gearman_failed(gearman_worker_add_function(worker,
226                                                    GEARMAN_BENCHMARK_DEFAULT_FUNCTION, 0,
227                                                    worker_fn, &benchmark)))
228     {
229       log.log() << "Failed to add default function: " << gearman_worker_error(worker) << std::endl;
230       return EXIT_FAILURE;
231     }
232   }
233 
234   gearman_worker_set_timeout(worker, timeout);
235 
236   do
237   {
238     gearman_return_t rc= gearman_worker_work(worker);
239 
240     if (rc == GEARMAN_SHUTDOWN)
241     {
242       if (benchmark.verbose > 0)
243       {
244         log.log() << "shutdown" << std::endl;
245       }
246       break;
247     }
248     else if (gearman_failed(rc))
249     {
250       log.log() << "gearman_worker_work(): " << gearman_worker_error(worker) << std::endl;
251       break;
252     }
253 
254     count--;
255   } while(count and (not signal.is_shutdown()));
256 
257   gearman_worker_free(worker);
258 
259   return EXIT_SUCCESS;
260 }
261 
worker_fn(gearman_job_st * job,void * context,size_t *,gearman_return_t * ret_ptr)262 static void *worker_fn(gearman_job_st *job, void *context,
263                        size_t *, gearman_return_t *ret_ptr)
264 {
265   gearman_benchmark_st *benchmark= static_cast<gearman_benchmark_st *>(context);
266 
267   if (benchmark->verbose > 0)
268   {
269     benchmark_check_time(benchmark);
270   }
271 
272   if (benchmark->verbose > 1)
273   {
274     std::cout << "Job=%s (" << gearman_job_workload_size(job) << ")" << std::endl;
275   }
276 
277   *ret_ptr= GEARMAN_SUCCESS;
278 
279   return NULL;
280 }
281