1 /*-------------------------------------------------------------------------
2  * monitor_thread.c
3  *
4  *	Implementation of the thread that manages monitoring
5  *
6  *	Copyright (c) 2011, PostgreSQL Global Development Group
7  *	Author: Christopher Browne, Afilias Canada
8  *-------------------------------------------------------------------------
9  */
10 
11 
12 #include <pthread.h>
13 
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <signal.h>
19 
20 #include <sys/types.h>
21 #include "types.h"
22 #include "slon.h"
23 
24 #ifndef WIN32
25 #include <unistd.h>
26 #include <sys/time.h>
27 #endif
28 
29 static void stack_init(void);
30 static bool stack_pop(SlonState * current);
31 #ifdef UNUSED
32 static void stack_dump();
33 static void entry_dump(int i, SlonState * tos);
34 #endif
35 static int	initial_stack_size = 6;
36 
37 /* ----------
38  * Global variables
39  * ----------
40  */
41 #define EMPTY_STACK -1
42 static SlonState *mstack;
43 static int	stack_size = EMPTY_STACK;
44 static int	stack_maxlength = 1;
45 static pthread_mutex_t stack_lock = PTHREAD_MUTEX_INITIALIZER;
46 int			monitor_interval;
47 
48 /* ----------
49  * slon_localMonitorThread
50  *
51  * Monitoring thread that periodically flushes stacked-up monitoring requests to database
52  * ----------
53  */
54 void *
monitorThread_main(void * dummy)55 monitorThread_main(void *dummy)
56 {
57 	SlonConn   *conn;
58 	SlonDString beginquery,
59 				commitquery;
60 	SlonDString monquery;
61 
62 	PGconn	   *dbconn;
63 	PGresult   *res;
64 	SlonState	state;
65 	ScheduleStatus rc;
66 
67 	slon_log(SLON_INFO,
68 			 "monitorThread: thread starts\n");
69 	pthread_mutex_lock(&stack_lock);
70 	stack_init();
71 	pthread_mutex_unlock(&stack_lock);
72 
73 
74 	/*
75 	 * Connect to the local database
76 	 */
77 	if ((conn = slon_connectdb(rtcfg_conninfo, "local_monitor")) == NULL)
78 	{
79 		slon_log(SLON_ERROR, "monitorThread: failure to connect to local database\n");
80 	}
81 	else
82 	{
83 		dbconn = conn->dbconn;
84 		slon_log(SLON_DEBUG2, "monitorThread: setup DB conn\n");
85 
86 		/* Start by emptying the sl_components table */
87 		dstring_init(&monquery);
88 		slon_mkquery(&monquery,
89 					 "delete from %s.sl_components;",
90 					 rtcfg_namespace);
91 
92 		res = PQexec(dbconn, dstring_data(&monquery));
93 		if (PQresultStatus(res) != PGRES_COMMAND_OK)
94 		{
95 			slon_log(SLON_ERROR,
96 					 "monitorThread: \"%s\" - %s\n",
97 					 dstring_data(&monquery), PQresultErrorMessage(res));
98 			PQclear(res);
99 			dstring_free(&monquery);
100 			monitor_threads = false;
101 			slon_disconnectdb(conn);
102 			slon_log(SLON_ERROR, "monitorThread: exit monitoring thread\n");
103 			pthread_exit(NULL);
104 			return (void *) 0;
105 		}
106 		else
107 		{
108 			PQclear(res);
109 			dstring_free(&monquery);
110 		}
111 
112 		monitor_state("local_monitor", 0, (pid_t) conn->conn_pid, "thread main loop", 0, "n/a");
113 
114 		/*
115 		 * set up queries that are run in each iteration
116 		 */
117 		dstring_init(&beginquery);
118 		slon_mkquery(&beginquery,
119 					 "start transaction;");
120 
121 		dstring_init(&commitquery);
122 		slon_mkquery(&commitquery, "commit;");
123 
124 		while ((rc = (ScheduleStatus) sched_wait_time(conn, SCHED_WAIT_SOCK_READ, monitor_interval) == SCHED_STATUS_OK))
125 		{
126 			int			qlen;
127 
128 			pthread_mutex_lock(&stack_lock);	/* lock access to stack size */
129 			qlen = stack_size;
130 			pthread_mutex_unlock(&stack_lock);
131 			if (qlen >= 0)
132 			{
133 				res = PQexec(dbconn, dstring_data(&beginquery));
134 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
135 				{
136 					slon_log(SLON_ERROR,
137 							 "monitorThread: \"%s\" - %s",
138 					   dstring_data(&beginquery), PQresultErrorMessage(res));
139 					PQclear(res);
140 					break;
141 				}
142 				PQclear(res);
143 
144 				/*
145 				 * Now, iterate through stack contents, and dump them all to
146 				 * the database
147 				 */
148 				while (stack_pop(&state))
149 				{
150 					dstring_init(&monquery);
151 					slon_mkquery(&monquery,
152 								 "select %s.component_state('%s', %d, %d,",
153 						rtcfg_namespace, state.actor, state.pid, state.node);
154 					if (state.conn_pid > 0)
155 					{
156 						slon_appendquery(&monquery, "%d, ", state.conn_pid);
157 					}
158 					else
159 					{
160 						slon_appendquery(&monquery, "NULL::integer, ");
161 					}
162 					if ((state.activity != 0) && strlen(state.activity) > 0)
163 					{
164 						slon_appendquery(&monquery, "'%s', ", state.activity);
165 					}
166 					else
167 					{
168 						slon_appendquery(&monquery, "NULL::text, ");
169 					}
170 					slon_appendquery(&monquery, "'1970-01-01 0:0:0 UTC'::timestamptz + '%d seconds'::interval, ", time(NULL));
171 					if (state.event > 0)
172 					{
173 						slon_appendquery(&monquery, "%L, ", state.event);
174 					}
175 					else
176 					{
177 						slon_appendquery(&monquery, "NULL::bigint, ");
178 					}
179 					if ((state.event_type != 0) && strlen(state.event_type) > 0)
180 					{
181 						slon_appendquery(&monquery, "'%s');", state.event_type);
182 					}
183 					else
184 					{
185 						slon_appendquery(&monquery, "NULL::text);");
186 					}
187 					if (state.actor != NULL)
188 						free(state.actor);
189 					if (state.activity != NULL)
190 						free(state.activity);
191 					if (state.event_type != NULL)
192 						free(state.event_type);
193 					res = PQexec(dbconn, dstring_data(&monquery));
194 					if (PQresultStatus(res) != PGRES_TUPLES_OK)
195 					{
196 						slon_log(SLON_ERROR,
197 								 "monitorThread: \"%s\" - %s",
198 						 dstring_data(&monquery), PQresultErrorMessage(res));
199 						PQclear(res);
200 						dstring_free(&monquery);
201 						break;
202 					}
203 					PQclear(res);
204 					dstring_free(&monquery);
205 				}
206 
207 				res = PQexec(dbconn, dstring_data(&commitquery));
208 				if (PQresultStatus(res) != PGRES_COMMAND_OK)
209 				{
210 					slon_log(SLON_ERROR,
211 							 "monitorThread: %s - %s\n",
212 							 dstring_data(&commitquery),
213 							 PQresultErrorMessage(res));
214 				}
215 				PQclear(res);
216 			}
217 			if ((rc = (ScheduleStatus) sched_msleep(NULL, monitor_interval)) != SCHED_STATUS_OK)
218 			{
219 				break;
220 			}
221 		}
222 		monitor_state("local_monitor", 0, (pid_t) conn->conn_pid, "just running", 0, "n/a");
223 	}
224 	slon_log(SLON_CONFIG, "monitorThread: exit main loop\n");
225 
226 	dstring_free(&beginquery);
227 	dstring_free(&commitquery);
228 	slon_disconnectdb(conn);
229 
230 	slon_log(SLON_INFO, "monitorThread: thread done\n");
231 	monitor_threads = false;
232 	pthread_exit(NULL);
233 	return (void *) 0;
234 }
235 
236 static void
stack_init(void)237 stack_init(void)
238 {
239 	stack_maxlength = initial_stack_size;
240 	mstack = malloc(sizeof(SlonState) * (stack_maxlength + 1));
241 	if (mstack == NULL)
242 	{
243 		slon_log(SLON_FATAL, "stack_init() - malloc() failure could not allocate %d stack slots\n", stack_maxlength);
244 		slon_retry();
245 	}
246 	else
247 	{
248 		slon_log(SLON_DEBUG2, "stack_init() - initialize stack to size %d\n", stack_maxlength);
249 	}
250 	stack_size = EMPTY_STACK;
251 }
252 
253 void
monitor_state(const char * actor,int node,pid_t conn_pid,const char * activity,int64 event,const char * event_type)254 monitor_state(const char *actor, int node, pid_t conn_pid, /* @null@ */ const char *activity, int64 event, /* @null@ */ const char *event_type)
255 {
256 	size_t		len;
257 	SlonState  *tos;
258 	SlonState  *nstack;
259 	char	   *ns;
260 	pid_t		mypid;
261 
262 	if (!monitor_threads)		/* Don't collect if this thread is shut off */
263 		return;
264 
265 	mypid = getpid();
266 	pthread_mutex_lock(&stack_lock);
267 	if (mstack == NULL)
268 	{
269 		stack_init();
270 	}
271 	if (stack_size >= stack_maxlength)
272 	{
273 		/* Need to reallocate stack */
274 		if (stack_size > 100)
275 		{
276 			slon_log(SLON_WARN, "monitorThread: stack reallocation - size %d > warning threshold of 100.  Stack perhaps isn't getting processed properly by monitoring thread\n", stack_size);
277 		}
278 		stack_maxlength *= 2;
279 
280 		nstack = realloc(mstack, (size_t) ((stack_maxlength + 1) * sizeof(SlonState)));
281 		if (nstack == NULL)
282 		{
283 			slon_log(SLON_FATAL, "stack_init() - malloc() failure could not allocate %d stack slots\n", stack_maxlength);
284 			pthread_mutex_unlock(&stack_lock);
285 			slon_retry();
286 		}
287 		mstack = nstack;
288 	}
289 
290 	/* if actor matches, then we can do an in-place update */
291 	if (stack_size != EMPTY_STACK)
292 	{
293 		tos = mstack + stack_size;
294 		len = strlen(actor);
295 		if (strncmp(actor, tos->actor, len) == 0)
296 		{
297 			if (tos->actor != NULL)
298 			{
299 				free(tos->actor);
300 			}
301 			if (tos->activity != NULL)
302 			{
303 				free(tos->activity);
304 			}
305 			if (tos->event_type != NULL)
306 			{
307 				free(tos->event_type);
308 			}
309 		}
310 		else
311 		{
312 			stack_size++;
313 		}
314 	}
315 	else
316 	{
317 		stack_size++;
318 	}
319 	tos = mstack + stack_size;
320 	tos->pid = mypid;
321 	tos->node = node;
322 	tos->conn_pid = conn_pid;
323 	tos->event = event;
324 
325 /* It might seem somewhat desirable for the database to record
326  *	DB-centred timestamps, unfortunately that would only be the
327  *	correct time if each thread were responsible for stowing its own
328  *	activities in sl_components in the database.  This would multiply
329  *	database activity, and the implementation instead passes requests
330  *	to a single thread that uses a single DB connection to record
331  *	things, with the consequence that timestamps must be captured
332  *	based on the system clock of the slon process. */
333 
334 	tos->start_time = time(NULL);
335 	if (actor != NULL)
336 	{
337 		len = strlen(actor);
338 		ns = malloc(sizeof(char) * len + 1);
339 		if (ns)
340 		{
341 			strncpy(ns, actor, len);
342 			ns[len] = '\0';
343 			tos->actor = ns;
344 		}
345 		else
346 		{
347 			slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for actor (len %d)\n", len);
348 			pthread_mutex_unlock(&stack_lock);
349 			slon_retry();
350 		}
351 	}
352 	else
353 	{
354 		tos->actor = NULL;
355 	}
356 	if (activity != NULL)
357 	{
358 		len = strlen(activity);
359 		ns = malloc(sizeof(char) * len + 1);
360 		if (ns)
361 		{
362 			strncpy(ns, activity, len);
363 			ns[len] = (char) 0;
364 			tos->activity = ns;
365 		}
366 		else
367 		{
368 			slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for activity (len %d)\n", len);
369 			pthread_mutex_unlock(&stack_lock);
370 			slon_retry();
371 		}
372 	}
373 	else
374 	{
375 		tos->activity = NULL;
376 	}
377 	if (event_type != NULL)
378 	{
379 		len = strlen(event_type);
380 		ns = malloc(sizeof(char) * len + 1);
381 		if (ns)
382 		{
383 			strncpy(ns, event_type, len);
384 			ns[len] = (char) 0;
385 			tos->event_type = ns;
386 		}
387 		else
388 		{
389 			slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for event_type (len %d)\n", len);
390 			pthread_mutex_unlock(&stack_lock);
391 			slon_retry();
392 		}
393 	}
394 	else
395 	{
396 		tos->event_type = NULL;
397 	}
398 	pthread_mutex_unlock(&stack_lock);
399 }
400 
401 /* Note that it is the caller's responsibility to free() the contents
402    of strings ->actor, ->activity, ->event_type */
403 static bool
stack_pop(SlonState * qentry)404 stack_pop( /* @out@ */ SlonState * qentry)
405 {
406 	SlonState  *ce = NULL;
407 
408 	pthread_mutex_lock(&stack_lock);
409 	if (stack_size == EMPTY_STACK)
410 	{
411 		pthread_mutex_unlock(&stack_lock);
412 		return false;
413 	}
414 	else
415 	{
416 		ce = mstack + stack_size;
417 		qentry->actor = ce->actor;
418 		qentry->pid = ce->pid;
419 		qentry->node = ce->node;
420 		qentry->conn_pid = ce->conn_pid;
421 		qentry->activity = ce->activity;
422 		qentry->event = ce->event;
423 		qentry->event_type = ce->event_type;
424 		qentry->start_time = ce->start_time;
425 		/* entry_dump(stack_size, qentry); */
426 		stack_size--;
427 		pthread_mutex_unlock(&stack_lock);
428 		return true;
429 	}
430 }
431 
432 #ifdef UNUSED
433 static void
stack_dump()434 stack_dump()
435 {
436 	int			i;
437 	SlonState  *tos;
438 
439 	slon_log(SLON_DEBUG2, "monitorThread: stack_dump()\n");
440 	pthread_mutex_lock(&stack_lock);
441 	for (i = 0; i < stack_size; i++)
442 	{
443 		tos = mstack + i;
444 		entry_dump(i, tos);
445 	}
446 	slon_log(SLON_DEBUG2, "monitorThread: stack_dump done\n");
447 	pthread_mutex_unlock(&stack_lock);
448 }
449 
450 /* Note that this function accesses stack contents, and thus needs to
451  * be guarded by the pthread mutex on stack_lock */
452 
453 static void
entry_dump(int i,SlonState * tos)454 entry_dump(int i, SlonState * tos)
455 {
456 	slon_log(SLON_DEBUG2, "stack[%d]=%d\n",
457 			 i, tos);
458 	slon_log(SLON_DEBUG2, "pid:%d node:%d connpid:%d event:%lld\n",
459 			 tos->pid, tos->node, tos->conn_pid, tos->event);
460 	slon_log(SLON_DEBUG2, "actor[%s] activity[%s] event_type[%s]\n",
461 			 tos->actor, tos->activity, tos->event_type);
462 }
463 #endif /* UNUSED */
464