1 // ----------------------------------------------------------------------------
2 //	qrunner.h
3 //
4 // Copyright (C) 2007-2009
5 //		Stelios Bounanos, M0GLD
6 //
7 // This file is part of fldigi.
8 //
9 // fldigi is free software; you can redistribute it and/or modify
10 // it under the terms of the GNU General Public License as published by
11 // the Free Software Foundation; either version 3 of the License, or
12 // (at your option) any later version.
13 //
14 // fldigi is distributed in the hope that it will be useful,
15 // but WITHOUT ANY WARRANTY; without even the implied warranty of
16 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17 // GNU General Public License for more details.
18 //
19 // You should have received a copy of the GNU General Public License
20 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 // ----------------------------------------------------------------------------
22 
23 #ifndef QRUNNER_H_
24 #define QRUNNER_H_
25 
26 #ifndef NDEBUG
27 #    include <inttypes.h>
28 #    include "debug.h"
29 #endif
30 
31 #include <unistd.h>
32 #include <cerrno>
33 #include <stdexcept>
34 #include <cstring>
35 #include <string>
36 
37 #if HAVE_STD_BIND
38 #   include <functional>
39 namespace qrbind {
40     using std::bind;
41 };
42 #elif HAVE_STD_TR1_BIND
43 #   include <tr1/functional>
44 namespace qrbind {
45     using std::tr1::bind;
46 };
47 #else
48 #   error need std::bind or std::tr1::bind
49 #endif
50 
51 #include "threads.h"
52 #include "qrunner/fqueue.h"
53 
54 extern void write_message(int line_no, const char *func, const char *msg);
55 extern void qrunner_debug(int tid, const char * name);
56 extern const char *sztid[];
57 
58 #ifndef __MINGW32__
59 typedef int SOCKET;
60 #  define QRUNNER_READ(fd__, buf__, len__) read(fd__, buf__, len__)
61 #  define QRUNNER_WRITE(fd__, buf__, len__) write(fd__, buf__, len__)
62 #else
63 #  undef _WINSOCKAPI_
64 #  include <winsock2.h>
65 #  define QRUNNER_READ(fd__, buf__, len__) recv(fd__, (char*)buf__, len__, 0)
66 #  define QRUNNER_WRITE(fd__, buf__, len__) send(fd__, (const char*)buf__, len__, 0)
67 #endif
68 
69 class qexception : public std::exception
70 {
71 public:
qexception(const char * msg_)72 	qexception(const char *msg_) : msg(msg_) { }
qexception(int e)73 	qexception(int e) : msg(strerror(e)) { }
throw()74 	~qexception() throw() { }
what(void)75 	const char *what(void) const throw() { perror(msg.c_str()); return msg.c_str(); }
76 private:
77 	std::string msg;
78 };
79 
80 struct fsignal
81 {
82 	typedef void result_type;
83 	pthread_mutex_t* m;
84 	pthread_cond_t* c;
85 
fsignalfsignal86 	fsignal(pthread_mutex_t* m_, pthread_cond_t* c_) : m(m_), c(c_) { }
operatorfsignal87 	void operator()(void) const
88 	{
89 		pthread_mutex_lock(m);
90 		pthread_cond_signal(c);
91 		pthread_mutex_unlock(m);
92 	}
93 };
94 
95 struct nop
96 {
97 	typedef void result_type;
operatornop98 	void operator()(void) const { }
99 };
100 
101 class qrunner
102 {
103 private:
104 #define QRUNNER_DEBUG false
105 public:
106 	qrunner();
107 	~qrunner();
108 
109 	void attach(void);
110 	void attach(int, std::string);
111 	void detach(void);
112 
113 	template <typename F>
request(const F & f)114 	bool request(const F& f)
115 	{
116 // added mutex here and removed it from qrunner_debug
117 		static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
118 		guard_lock reqlock(&m);
119 
120 		if (fifo->push(f)) {
121 			int resp = QRUNNER_WRITE(pfd[1], "", 1);
122 			if (QRUNNER_DEBUG)
123 				qrunner_debug(GET_THREAD_ID(), typeid(F).name());
124 #ifdef NDEBUG
125 			if (unlikely(resp != 1)) {
126 				throw qexception(errno);
127 			}
128 #else
129 			assert(resp);
130 #endif
131 			return true;
132 		}
133 		return false;
134 	}
135 
136 	template <typename F>
request_sync(const F & f)137 	bool request_sync(const F& f)
138 	{
139 		if (!attached)
140 			return request(f);
141 
142 		for (;;) {
143 			if (request(f))
144 				break;
145 			sched_yield();
146 		}
147 		static pthread_mutex_t m = PTHREAD_MUTEX_INITIALIZER;
148 		static pthread_cond_t c = PTHREAD_COND_INITIALIZER;
149 		fsignal s(&m, &c);
150 		pthread_mutex_lock(&m);
151 		for (;;) {
152 			if (request(s))
153 				break;
154 			sched_yield();
155 		}
156 		pthread_cond_wait(&c, &m);
157 		pthread_mutex_unlock(&m);
158 
159 		return true;
160 	}
161 
162 	static void execute(int fd, void *arg);
163 	void flush(void);
164 
drop(void)165 	void drop(void) { fifo->drop(); }
size(void)166 	size_t size(void) { return fifo->size(); }
id_str(void)167 	std::string id_str(void) { return _id_string; }
168 
169 protected:
170 	fqueue *fifo;
171 	SOCKET pfd[2];
172 	bool attached;
173 	int _id_no;
174 	std::string _id_string;
175 	bool inprog;
176 public:
177 	bool drop_flag;
178 };
179 
180 
181 extern qrunner *cbq[NUM_QRUNNER_THREADS];
182 
183 #if BENCHMARK_MODE
184 #define REQ(...) ((void)0)
185 #define REQ_DROP(...) ((void)0)
186 #define REQ_SYNC(...) ((void)0)
187 #define REQ_FLUSH(...) ((void)0)
188 #define QRUNNER_DROP(...) ((void)0)
189 #else
190 
191 #define REQ REQ_ASYNC
192 #define REQ_DROP REQ_ASYNC_DROP
193 
194 #define REQ_ASYNC(...)													\
195 	do {																\
196 		if (GET_THREAD_ID() != FLMAIN_TID)								\
197 			cbq[GET_THREAD_ID()]->request(qrbind::bind(__VA_ARGS__));	\
198 		else															\
199 			qrbind::bind(__VA_ARGS__)();								\
200 	} while (0)
201 
202 #define REQ_SYNC(...)														\
203 	do {																	\
204 		if (GET_THREAD_ID() != FLMAIN_TID)									\
205 			cbq[GET_THREAD_ID()]->request_sync(qrbind::bind(__VA_ARGS__));	\
206 		else																\
207 			qrbind::bind(__VA_ARGS__)();									\
208 	} while (0)
209 
210 #define REQ_ASYNC_DROP(...)												\
211 	do {																\
212 		if (GET_THREAD_ID() != FLMAIN_TID) {							\
213 			if (unlikely(cbq[GET_THREAD_ID()]->drop_flag))				\
214 				break;													\
215 			cbq[GET_THREAD_ID()]->request(qrbind::bind(__VA_ARGS__));	\
216 		}																\
217 		else                                                    		\
218 			qrbind::bind(__VA_ARGS__)();								\
219 	} while (0)
220 
221 #define REQ_SYNC_DROP(...)													\
222 	do {																	\
223 		if (GET_THREAD_ID() != FLMAIN_TID) {								\
224 			if (unlikely(cbq[GET_THREAD_ID()]->drop_flag))					\
225 				break;														\
226 			cbq[GET_THREAD_ID()]->request_sync(qrbind::bind(__VA_ARGS__));	\
227 		}																	\
228 		else																\
229 			qrbind::bind(__VA_ARGS__)();									\
230 	} while (0)
231 
232 #define REQ_FLUSH(t_)										\
233 	do {													\
234 		if (GET_THREAD_ID() != FLMAIN_TID)					\
235 			cbq[GET_THREAD_ID()]->request_sync(nop());		\
236 		else if (t_ < NUM_QRUNNER_THREADS)					\
237 			cbq[t_]->flush();								\
238 		else												\
239 			for (int i = 0; i < NUM_QRUNNER_THREADS; i++)	\
240 				cbq[i]->flush();							\
241         } while (0)
242 
243 #define QRUNNER_DROP(v_)							\
244 	do {											\
245 		if ((GET_THREAD_ID() != FLMAIN_TID))		\
246 			cbq[GET_THREAD_ID()]->drop_flag = v_;	\
247 	} while (0)
248 
249 #endif // BENCHMARK_MODE
250 
251 #endif // QRUNNER_H_
252 
253 // Local Variables:
254 // mode: c++
255 // c-file-style: "linux"
256 // End:
257