1 /*-------------------------------------------------------------------------
2 * monitor_thread.c
3 *
4 * Implementation of the thread that manages monitoring
5 *
6 * Copyright (c) 2011, PostgreSQL Global Development Group
7 * Author: Christopher Browne, Afilias Canada
8 *-------------------------------------------------------------------------
9 */
10
11
12 #include <pthread.h>
13
14 #include <stdio.h>
15 #include <stdlib.h>
16 #include <string.h>
17 #include <errno.h>
18 #include <signal.h>
19
20 #include <sys/types.h>
21 #include "types.h"
22 #include "slon.h"
23
24 #ifndef WIN32
25 #include <unistd.h>
26 #include <sys/time.h>
27 #endif
28
29 static void stack_init(void);
30 static bool stack_pop(SlonState * current);
31 #ifdef UNUSED
32 static void stack_dump();
33 static void entry_dump(int i, SlonState * tos);
34 #endif
35 static int initial_stack_size = 6;
36
37 /* ----------
38 * Global variables
39 * ----------
40 */
41 #define EMPTY_STACK -1
42 static SlonState *mstack;
43 static int stack_size = EMPTY_STACK;
44 static int stack_maxlength = 1;
45 static pthread_mutex_t stack_lock = PTHREAD_MUTEX_INITIALIZER;
46 int monitor_interval;
47
48 /* ----------
49 * slon_localMonitorThread
50 *
51 * Monitoring thread that periodically flushes stacked-up monitoring requests to database
52 * ----------
53 */
54 void *
monitorThread_main(void * dummy)55 monitorThread_main(void *dummy)
56 {
57 SlonConn *conn;
58 SlonDString beginquery,
59 commitquery;
60 SlonDString monquery;
61
62 PGconn *dbconn;
63 PGresult *res;
64 SlonState state;
65 ScheduleStatus rc;
66
67 slon_log(SLON_INFO,
68 "monitorThread: thread starts\n");
69 pthread_mutex_lock(&stack_lock);
70 stack_init();
71 pthread_mutex_unlock(&stack_lock);
72
73
74 /*
75 * Connect to the local database
76 */
77 if ((conn = slon_connectdb(rtcfg_conninfo, "local_monitor")) == NULL)
78 {
79 slon_log(SLON_ERROR, "monitorThread: failure to connect to local database\n");
80 }
81 else
82 {
83 dbconn = conn->dbconn;
84 slon_log(SLON_DEBUG2, "monitorThread: setup DB conn\n");
85
86 /* Start by emptying the sl_components table */
87 dstring_init(&monquery);
88 slon_mkquery(&monquery,
89 "delete from %s.sl_components;",
90 rtcfg_namespace);
91
92 res = PQexec(dbconn, dstring_data(&monquery));
93 if (PQresultStatus(res) != PGRES_COMMAND_OK)
94 {
95 slon_log(SLON_ERROR,
96 "monitorThread: \"%s\" - %s\n",
97 dstring_data(&monquery), PQresultErrorMessage(res));
98 PQclear(res);
99 dstring_free(&monquery);
100 monitor_threads = false;
101 slon_disconnectdb(conn);
102 slon_log(SLON_ERROR, "monitorThread: exit monitoring thread\n");
103 pthread_exit(NULL);
104 return (void *) 0;
105 }
106 else
107 {
108 PQclear(res);
109 dstring_free(&monquery);
110 }
111
112 monitor_state("local_monitor", 0, (pid_t) conn->conn_pid, "thread main loop", 0, "n/a");
113
114 /*
115 * set up queries that are run in each iteration
116 */
117 dstring_init(&beginquery);
118 slon_mkquery(&beginquery,
119 "start transaction;");
120
121 dstring_init(&commitquery);
122 slon_mkquery(&commitquery, "commit;");
123
124 while ((rc = (ScheduleStatus) sched_wait_time(conn, SCHED_WAIT_SOCK_READ, monitor_interval) == SCHED_STATUS_OK))
125 {
126 int qlen;
127
128 pthread_mutex_lock(&stack_lock); /* lock access to stack size */
129 qlen = stack_size;
130 pthread_mutex_unlock(&stack_lock);
131 if (qlen >= 0)
132 {
133 res = PQexec(dbconn, dstring_data(&beginquery));
134 if (PQresultStatus(res) != PGRES_COMMAND_OK)
135 {
136 slon_log(SLON_ERROR,
137 "monitorThread: \"%s\" - %s",
138 dstring_data(&beginquery), PQresultErrorMessage(res));
139 PQclear(res);
140 break;
141 }
142 PQclear(res);
143
144 /*
145 * Now, iterate through stack contents, and dump them all to
146 * the database
147 */
148 while (stack_pop(&state))
149 {
150 dstring_init(&monquery);
151 slon_mkquery(&monquery,
152 "select %s.component_state('%s', %d, %d,",
153 rtcfg_namespace, state.actor, state.pid, state.node);
154 if (state.conn_pid > 0)
155 {
156 slon_appendquery(&monquery, "%d, ", state.conn_pid);
157 }
158 else
159 {
160 slon_appendquery(&monquery, "NULL::integer, ");
161 }
162 if ((state.activity != 0) && strlen(state.activity) > 0)
163 {
164 slon_appendquery(&monquery, "'%s', ", state.activity);
165 }
166 else
167 {
168 slon_appendquery(&monquery, "NULL::text, ");
169 }
170 slon_appendquery(&monquery, "'1970-01-01 0:0:0 UTC'::timestamptz + '%d seconds'::interval, ", time(NULL));
171 if (state.event > 0)
172 {
173 slon_appendquery(&monquery, "%L, ", state.event);
174 }
175 else
176 {
177 slon_appendquery(&monquery, "NULL::bigint, ");
178 }
179 if ((state.event_type != 0) && strlen(state.event_type) > 0)
180 {
181 slon_appendquery(&monquery, "'%s');", state.event_type);
182 }
183 else
184 {
185 slon_appendquery(&monquery, "NULL::text);");
186 }
187 if (state.actor != NULL)
188 free(state.actor);
189 if (state.activity != NULL)
190 free(state.activity);
191 if (state.event_type != NULL)
192 free(state.event_type);
193 res = PQexec(dbconn, dstring_data(&monquery));
194 if (PQresultStatus(res) != PGRES_TUPLES_OK)
195 {
196 slon_log(SLON_ERROR,
197 "monitorThread: \"%s\" - %s",
198 dstring_data(&monquery), PQresultErrorMessage(res));
199 PQclear(res);
200 dstring_free(&monquery);
201 break;
202 }
203 PQclear(res);
204 dstring_free(&monquery);
205 }
206
207 res = PQexec(dbconn, dstring_data(&commitquery));
208 if (PQresultStatus(res) != PGRES_COMMAND_OK)
209 {
210 slon_log(SLON_ERROR,
211 "monitorThread: %s - %s\n",
212 dstring_data(&commitquery),
213 PQresultErrorMessage(res));
214 }
215 PQclear(res);
216 }
217 if ((rc = (ScheduleStatus) sched_msleep(NULL, monitor_interval)) != SCHED_STATUS_OK)
218 {
219 break;
220 }
221 }
222 monitor_state("local_monitor", 0, (pid_t) conn->conn_pid, "just running", 0, "n/a");
223 }
224 slon_log(SLON_CONFIG, "monitorThread: exit main loop\n");
225
226 dstring_free(&beginquery);
227 dstring_free(&commitquery);
228 slon_disconnectdb(conn);
229
230 slon_log(SLON_INFO, "monitorThread: thread done\n");
231 monitor_threads = false;
232 pthread_exit(NULL);
233 return (void *) 0;
234 }
235
236 static void
stack_init(void)237 stack_init(void)
238 {
239 stack_maxlength = initial_stack_size;
240 mstack = malloc(sizeof(SlonState) * (stack_maxlength + 1));
241 if (mstack == NULL)
242 {
243 slon_log(SLON_FATAL, "stack_init() - malloc() failure could not allocate %d stack slots\n", stack_maxlength);
244 slon_retry();
245 }
246 else
247 {
248 slon_log(SLON_DEBUG2, "stack_init() - initialize stack to size %d\n", stack_maxlength);
249 }
250 stack_size = EMPTY_STACK;
251 }
252
253 void
monitor_state(const char * actor,int node,pid_t conn_pid,const char * activity,int64 event,const char * event_type)254 monitor_state(const char *actor, int node, pid_t conn_pid, /* @null@ */ const char *activity, int64 event, /* @null@ */ const char *event_type)
255 {
256 size_t len;
257 SlonState *tos;
258 SlonState *nstack;
259 char *ns;
260 pid_t mypid;
261
262 if (!monitor_threads) /* Don't collect if this thread is shut off */
263 return;
264
265 mypid = getpid();
266 pthread_mutex_lock(&stack_lock);
267 if (mstack == NULL)
268 {
269 stack_init();
270 }
271 if (stack_size >= stack_maxlength)
272 {
273 /* Need to reallocate stack */
274 if (stack_size > 100)
275 {
276 slon_log(SLON_WARN, "monitorThread: stack reallocation - size %d > warning threshold of 100. Stack perhaps isn't getting processed properly by monitoring thread\n", stack_size);
277 }
278 stack_maxlength *= 2;
279
280 nstack = realloc(mstack, (size_t) ((stack_maxlength + 1) * sizeof(SlonState)));
281 if (nstack == NULL)
282 {
283 slon_log(SLON_FATAL, "stack_init() - malloc() failure could not allocate %d stack slots\n", stack_maxlength);
284 pthread_mutex_unlock(&stack_lock);
285 slon_retry();
286 }
287 mstack = nstack;
288 }
289
290 /* if actor matches, then we can do an in-place update */
291 if (stack_size != EMPTY_STACK)
292 {
293 tos = mstack + stack_size;
294 len = strlen(actor);
295 if (strncmp(actor, tos->actor, len) == 0)
296 {
297 if (tos->actor != NULL)
298 {
299 free(tos->actor);
300 }
301 if (tos->activity != NULL)
302 {
303 free(tos->activity);
304 }
305 if (tos->event_type != NULL)
306 {
307 free(tos->event_type);
308 }
309 }
310 else
311 {
312 stack_size++;
313 }
314 }
315 else
316 {
317 stack_size++;
318 }
319 tos = mstack + stack_size;
320 tos->pid = mypid;
321 tos->node = node;
322 tos->conn_pid = conn_pid;
323 tos->event = event;
324
325 /* It might seem somewhat desirable for the database to record
326 * DB-centred timestamps, unfortunately that would only be the
327 * correct time if each thread were responsible for stowing its own
328 * activities in sl_components in the database. This would multiply
329 * database activity, and the implementation instead passes requests
330 * to a single thread that uses a single DB connection to record
331 * things, with the consequence that timestamps must be captured
332 * based on the system clock of the slon process. */
333
334 tos->start_time = time(NULL);
335 if (actor != NULL)
336 {
337 len = strlen(actor);
338 ns = malloc(sizeof(char) * len + 1);
339 if (ns)
340 {
341 strncpy(ns, actor, len);
342 ns[len] = '\0';
343 tos->actor = ns;
344 }
345 else
346 {
347 slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for actor (len %d)\n", len);
348 pthread_mutex_unlock(&stack_lock);
349 slon_retry();
350 }
351 }
352 else
353 {
354 tos->actor = NULL;
355 }
356 if (activity != NULL)
357 {
358 len = strlen(activity);
359 ns = malloc(sizeof(char) * len + 1);
360 if (ns)
361 {
362 strncpy(ns, activity, len);
363 ns[len] = (char) 0;
364 tos->activity = ns;
365 }
366 else
367 {
368 slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for activity (len %d)\n", len);
369 pthread_mutex_unlock(&stack_lock);
370 slon_retry();
371 }
372 }
373 else
374 {
375 tos->activity = NULL;
376 }
377 if (event_type != NULL)
378 {
379 len = strlen(event_type);
380 ns = malloc(sizeof(char) * len + 1);
381 if (ns)
382 {
383 strncpy(ns, event_type, len);
384 ns[len] = (char) 0;
385 tos->event_type = ns;
386 }
387 else
388 {
389 slon_log(SLON_FATAL, "monitor_state - unable to allocate memory for event_type (len %d)\n", len);
390 pthread_mutex_unlock(&stack_lock);
391 slon_retry();
392 }
393 }
394 else
395 {
396 tos->event_type = NULL;
397 }
398 pthread_mutex_unlock(&stack_lock);
399 }
400
401 /* Note that it is the caller's responsibility to free() the contents
402 of strings ->actor, ->activity, ->event_type */
403 static bool
stack_pop(SlonState * qentry)404 stack_pop( /* @out@ */ SlonState * qentry)
405 {
406 SlonState *ce = NULL;
407
408 pthread_mutex_lock(&stack_lock);
409 if (stack_size == EMPTY_STACK)
410 {
411 pthread_mutex_unlock(&stack_lock);
412 return false;
413 }
414 else
415 {
416 ce = mstack + stack_size;
417 qentry->actor = ce->actor;
418 qentry->pid = ce->pid;
419 qentry->node = ce->node;
420 qentry->conn_pid = ce->conn_pid;
421 qentry->activity = ce->activity;
422 qentry->event = ce->event;
423 qentry->event_type = ce->event_type;
424 qentry->start_time = ce->start_time;
425 /* entry_dump(stack_size, qentry); */
426 stack_size--;
427 pthread_mutex_unlock(&stack_lock);
428 return true;
429 }
430 }
431
432 #ifdef UNUSED
433 static void
stack_dump()434 stack_dump()
435 {
436 int i;
437 SlonState *tos;
438
439 slon_log(SLON_DEBUG2, "monitorThread: stack_dump()\n");
440 pthread_mutex_lock(&stack_lock);
441 for (i = 0; i < stack_size; i++)
442 {
443 tos = mstack + i;
444 entry_dump(i, tos);
445 }
446 slon_log(SLON_DEBUG2, "monitorThread: stack_dump done\n");
447 pthread_mutex_unlock(&stack_lock);
448 }
449
450 /* Note that this function accesses stack contents, and thus needs to
451 * be guarded by the pthread mutex on stack_lock */
452
453 static void
entry_dump(int i,SlonState * tos)454 entry_dump(int i, SlonState * tos)
455 {
456 slon_log(SLON_DEBUG2, "stack[%d]=%d\n",
457 i, tos);
458 slon_log(SLON_DEBUG2, "pid:%d node:%d connpid:%d event:%lld\n",
459 tos->pid, tos->node, tos->conn_pid, tos->event);
460 slon_log(SLON_DEBUG2, "actor[%s] activity[%s] event_type[%s]\n",
461 tos->actor, tos->activity, tos->event_type);
462 }
463 #endif /* UNUSED */
464