1 /*
2 ** Zabbix
3 ** Copyright (C) 2001-2021 Zabbix SIA
4 **
5 ** This program is free software; you can redistribute it and/or modify
6 ** it under the terms of the GNU General Public License as published by
7 ** the Free Software Foundation; either version 2 of the License, or
8 ** (at your option) any later version.
9 **
10 ** This program is distributed in the hope that it will be useful,
11 ** but WITHOUT ANY WARRANTY; without even the implied warranty of
12 ** MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 ** GNU General Public License for more details.
14 **
15 ** You should have received a copy of the GNU General Public License
16 ** along with this program; if not, write to the Free Software
17 ** Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
18 **/
19
20 #include "common.h"
21
22 #include "db.h"
23 #include "log.h"
24 #include "daemon.h"
25 #include "zbxself.h"
26 #include "sighandler.h"
27
28 #include "dbcache.h"
29 #include "dbsyncer.h"
30 #include "export.h"
31
32 extern int CONFIG_HISTSYNCER_FREQUENCY;
33 extern unsigned char process_type, program_type;
34 extern int server_num, process_num;
35 static sigset_t orig_mask;
36
37 /******************************************************************************
38 * *
39 * Function: zbx_db_flush_timer_queue *
40 * *
41 * Purpose: flush timer queue to the database *
42 * *
43 ******************************************************************************/
zbx_db_flush_timer_queue(void)44 static void zbx_db_flush_timer_queue(void)
45 {
46 int i;
47 zbx_vector_ptr_t persistent_timers;
48 zbx_db_insert_t db_insert;
49
50 zbx_vector_ptr_create(&persistent_timers);
51 zbx_dc_clear_timer_queue(&persistent_timers);
52
53 if (0 != persistent_timers.values_num)
54 {
55 zbx_db_insert_prepare(&db_insert, "trigger_queue", "trigger_queueid", "objectid", "type", "clock", "ns", NULL);
56
57 for (i = 0; i < persistent_timers.values_num; i++)
58 {
59 zbx_trigger_timer_t *timer = (zbx_trigger_timer_t *)persistent_timers.values[i];
60
61 zbx_db_insert_add_values(&db_insert, __UINT64_C(0), timer->objectid, timer->type,
62 timer->eval_ts.sec, timer->eval_ts.ns);
63 }
64
65 zbx_dc_free_timers(&persistent_timers);
66
67 zbx_db_insert_autoincrement(&db_insert, "trigger_queueid");
68 zbx_db_insert_execute(&db_insert);
69 zbx_db_insert_clean(&db_insert);
70 }
71
72 zbx_vector_ptr_destroy(&persistent_timers);
73 }
74
db_trigger_queue_cleanup(void)75 static void db_trigger_queue_cleanup(void)
76 {
77 DBexecute("delete from trigger_queue");
78 zbx_db_trigger_queue_unlock();
79 }
80
81 /******************************************************************************
82 * *
83 * Function: main_dbsyncer_loop *
84 * *
85 * Purpose: periodically synchronises data in memory cache with database *
86 * *
87 * Author: Alexei Vladishev *
88 * *
89 * Comments: never returns *
90 * *
91 ******************************************************************************/
ZBX_THREAD_ENTRY(dbsyncer_thread,args)92 ZBX_THREAD_ENTRY(dbsyncer_thread, args)
93 {
94 int sleeptime = -1, total_values_num = 0, values_num, more, total_triggers_num = 0, triggers_num;
95 double sec, total_sec = 0.0;
96 time_t last_stat_time;
97 char *stats = NULL;
98 const char *process_name;
99 size_t stats_alloc = 0, stats_offset = 0;
100
101 process_type = ((zbx_thread_args_t *)args)->process_type;
102 server_num = ((zbx_thread_args_t *)args)->server_num;
103 process_num = ((zbx_thread_args_t *)args)->process_num;
104
105 zabbix_log(LOG_LEVEL_INFORMATION, "%s #%d started [%s #%d]", get_program_type_string(program_type), server_num,
106 (process_name = get_process_type_string(process_type)), process_num);
107
108 update_selfmon_counter(ZBX_PROCESS_STATE_BUSY);
109
110 #define STAT_INTERVAL 5 /* if a process is busy and does not sleep then update status not faster than */
111 /* once in STAT_INTERVAL seconds */
112
113 zbx_setproctitle("%s #%d [connecting to the database]", process_name, process_num);
114 last_stat_time = time(NULL);
115
116 zbx_strcpy_alloc(&stats, &stats_alloc, &stats_offset, "started");
117
118 /* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
119 zbx_block_signals(&orig_mask);
120 DBconnect(ZBX_DB_CONNECT_NORMAL);
121
122 if (1 == process_num)
123 db_trigger_queue_cleanup();
124
125 zbx_unblock_signals(&orig_mask);
126
127 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_HISTORY))
128 zbx_history_export_init("history-syncer", process_num);
129
130 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_TRENDS))
131 zbx_trends_export_init("history-syncer", process_num);
132
133 if (SUCCEED == zbx_is_export_enabled(ZBX_FLAG_EXPTYPE_EVENTS))
134 zbx_problems_export_init("history-syncer", process_num);
135
136 for (;;)
137 {
138 sec = zbx_time();
139 zbx_update_env(sec);
140
141 if (0 != sleeptime)
142 zbx_setproctitle("%s #%d [%s, syncing history]", process_name, process_num, stats);
143
144 /* clear timer trigger queue to avoid processing time triggers at exit */
145 if (!ZBX_IS_RUNNING())
146 zbx_log_sync_history_cache_progress();
147
148 /* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
149 zbx_block_signals(&orig_mask);
150 zbx_sync_history_cache(&values_num, &triggers_num, &more);
151
152 if (!ZBX_IS_RUNNING() && SUCCEED != zbx_db_trigger_queue_locked())
153 zbx_db_flush_timer_queue();
154
155 zbx_unblock_signals(&orig_mask);
156
157 total_values_num += values_num;
158 total_triggers_num += triggers_num;
159 total_sec += zbx_time() - sec;
160
161 sleeptime = (ZBX_SYNC_MORE == more ? 0 : CONFIG_HISTSYNCER_FREQUENCY);
162
163 if (0 != sleeptime || STAT_INTERVAL <= time(NULL) - last_stat_time)
164 {
165 stats_offset = 0;
166 zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, "processed %d values", total_values_num);
167
168 if (0 != (program_type & ZBX_PROGRAM_TYPE_SERVER))
169 {
170 zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, ", %d triggers",
171 total_triggers_num);
172 }
173
174 zbx_snprintf_alloc(&stats, &stats_alloc, &stats_offset, " in " ZBX_FS_DBL " sec", total_sec);
175
176 if (0 == sleeptime)
177 zbx_setproctitle("%s #%d [%s, syncing history]", process_name, process_num, stats);
178 else
179 zbx_setproctitle("%s #%d [%s, idle %d sec]", process_name, process_num, stats, sleeptime);
180
181 total_values_num = 0;
182 total_triggers_num = 0;
183 total_sec = 0.0;
184 last_stat_time = time(NULL);
185 }
186
187 if (ZBX_SYNC_MORE == more)
188 continue;
189
190 if (!ZBX_IS_RUNNING())
191 break;
192
193 zbx_sleep_loop(sleeptime);
194 }
195
196 /* database APIs might not handle signals correctly and hang, block signals to avoid hanging */
197 zbx_block_signals(&orig_mask);
198 if (SUCCEED != zbx_db_trigger_queue_locked())
199 zbx_db_flush_timer_queue();
200
201 DBclose();
202 zbx_unblock_signals(&orig_mask);
203
204 zbx_log_sync_history_cache_progress();
205
206 zbx_free(stats);
207
208 exit(EXIT_SUCCESS);
209 #undef STAT_INTERVAL
210 }
211